Java学习笔记之dubbo服务之底层通讯协议Protocol
我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下图:
然后我们进入 protocol.export(invoker)方法发现有很多实现类,根据spi(不懂的请看之前写的容器篇)查看配置文件能找到如下
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol???//这个是默认的,我们在Protocol接口上可以看到spi的注解
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
进入DubboProtocol.export(Invoker
代码:
??private?void?openServer(URL?url)?{
????????//?find?server.
????????String?key?=?url.getAddress();
????????//client?也可以暴露一个只有server可以调用的服务。
????????boolean?isServer?=?url.getParameter(Constants.IS_SERVER_KEY,true);
????????if?(isServer)?{
????????ExchangeServer?server?=?serverMap.get(key);
????????if?(server?==?null)?{
????????serverMap.put(key,?createServer(url));?//createServer是创建服务
????????}?else?{
????????//server支持reset,配合override功能使用
????????server.reset(url);
????????}
????????}
????}
继续进入createServer,上源码
????private?ExchangeServer?createServer(URL?url)?{
????????//默认开启server关闭时发送readonly事件
????????url?=?url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,?Boolean.TRUE.toString());
????????//默认开启heartbeat
????????url?=?url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,?String.valueOf(Constants.DEFAULT_HEARTBEAT));
????????String?str?=?url.getParameter(Constants.SERVER_KEY,?Constants.DEFAULT_REMOTING_SERVER);
????????if?(str?!=?null?&&?str.length()?>?0?&&?!?ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
????????????throw?new?RpcException("Unsupported?server?type:?"?+?str?+?",?url:?"?+?url);
????????url?=?url.addParameter(Constants.CODEC_KEY,?Version.isCompatibleVersion()???COMPATIBLE_CODEC_NAME?:?DubboCodec.NAME);
????????ExchangeServer?server;
????????try?{
????????????server?=?Exchangers.bind(url,?requestHandler);
????????}?catch?(RemotingException?e)?{
????????????throw?new?RpcException("Fail?to?start?server(url:?"?+?url?+?")?"?+?e.getMessage(),?e);
????????}
????????str?=?url.getParameter(Constants.CLIENT_KEY);
????????if?(str?!=?null?&&?str.length()?>?0)?{
????????????Set
????????????if?(!supportedTypes.contains(str))?{
????????????????throw?new?RpcException("Unsupported?client?type:?"?+?str);
????????????}
????????}
????????return?server;
????}
? ? dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进行服务端server的创建,同上面的server = Exchangers.bind(url, requestHandler) 正式创建服务。
? ? 所以基本的创建步骤是
?? export() ?--> ?openServer() ?--> ?createServer() ?--> ?server = Exchangers.bind(url, requestHandler); ?
? 我们进行来看 Exchangers.bind(url, requestHandler)
? 源码:
public?static?ExchangeServer?bind(URL?url,?ExchangeHandler?handler)?throws?RemotingException?{
??????if?(url?==?null)?{
??????????throw?new?IllegalArgumentException("url?==?null");
??????}
??????if?(handler?==?null)?{
??????????throw?new?IllegalArgumentException("handler?==?null");
??????}
??????url?=?url.addParameterIfAbsent(Constants.CODEC_KEY,?"exchange");
??????return?getExchanger(url).bind(url,?handler);
??}
? 然后通过getExchanger(url).bind(url, handler)的bing进入 HeaderExchanger类
??public?ExchangeServer?bind(URL?url,?ExchangeHandler?handler)?throws?RemotingException?{
????????return?new?HeaderExchangeServer(Transporters.bind(url,?new?DecodeHandler(new?HeaderExchangeHandler(handler))));
????}
? ? 在进入Transporters类的bing的
??public?static?Server?bind(URL?url,?ChannelHandler...?handlers)?throws?RemotingException?{
?????????if?(url?==?null)?{
?????????????throw?new?IllegalArgumentException("url?==?null");
?????????}
?????????if?(handlers?==?null?||?handlers.length?==?0)?{
?????????????throw?new?IllegalArgumentException("handlers?==?null");
?????????}
?????????ChannelHandler?handler;
?????????if?(handlers.length?==?1)?{
?????????????handler?=?handlers[0];
?????????}?else?{
?????????????handler?=?new?ChannelHandlerDispatcher(handlers);
?????????}
?????????return?getTransporter().bind(url,?handler);
?????}
通过bing可以知道他讲调用:GrizzlyTransporter,MinaTransporter,NettyTransporter 通过spi默认是调用NettyTransporter
? ? ?到这里我们基本明白dubbo的通讯默认是交给了netty来处理,
? ? ?我们在看下doOPen方法
@Override
????????protected?void?doOpen()?throws?Throwable?{
????????????NettyHelper.setNettyLoggerFactory();
????????????ExecutorService?boss?=?Executors.newCachedThreadPool(new?NamedThreadFactory("NettyServerBoss",?true));
????????????ExecutorService?worker?=?Executors.newCachedThreadPool(new?NamedThreadFactory("NettyServerWorker",?true));
????????????ChannelFactory?channelFactory?=?new?NioServerSocketChannelFactory(boss,?worker,?getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,?Constants.DEFAULT_IO_THREADS));
????????????bootstrap?=?new?ServerBootstrap(channelFactory);
????????????final?NettyHandler?nettyHandler?=?new?NettyHandler(getUrl(),?this);
????????????channels?=?nettyHandler.getChannels();
????????????//?https://issues.jboss.org/browse/NETTY-365
????????????//?https://issues.jboss.org/browse/NETTY-379
????????????//?final?Timer?timer?=?new?HashedWheelTimer(new?NamedThreadFactory("NettyIdleTimer",?true));
????????????bootstrap.setPipelineFactory(new?ChannelPipelineFactory()?{
????????????????public?ChannelPipeline?getPipeline()?{
????????????????????NettyCodecAdapter?adapter?=?new?NettyCodecAdapter(getCodec()?,getUrl(),?NettyServer.this);
????????????????????ChannelPipeline?pipeline?=?Channels.pipeline();
????????????????????/*int?idleTimeout?=?getIdleTimeout();
????????????????????if?(idleTimeout?>?10000)?{
????????????????????????pipeline.addLast("timer",?new?IdleStateHandler(timer,?idleTimeout?/?1000,?0,?0));
????????????????????}*/
????????????????????pipeline.addLast("decoder",?adapter.getDecoder());//解码
????????????????????pipeline.addLast("encoder",?adapter.getEncoder());//编码
????????????????????pipeline.addLast("handler",?nettyHandler);
????????????????????return?pipeline;
????????????????}
????????????});
????????????//?bind
????????????channel?=?bootstrap.bind(getBindAddress());
????????}
?了解netty的同学,肯定早已习惯这个方法的写法,就是创建了netty的server嘛,到这里dubbo的服务创建完毕了,这个时候控制台见打印:
?[DUBBO]?Start?NettyServer?bind?/0.0.0.0:20880,?export?/192.168.4.241:20880,?dubbo?version:?2.8.4,?current?host:?127.0.0.1