Java学习笔记之dubbo服务之底层通讯协议Protocol

我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下图: 然后我们进入 protocol.

我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下图:1240

 

然后我们进入 protocol.export(invoker)方法发现有很多实现类,根据spi(不懂的请看之前写的容器篇)查看配置文件能找到如下

1240

 

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol

dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol???//这个是默认的,我们在Protocol接口上可以看到spi的注解

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper

listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper

mock=com.alibaba.dubbo.rpc.support.MockProtocol

injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol

rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol

hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol

com.alibaba.dubbo.rpc.protocol.http.HttpProtocol

com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol

thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol

memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol

redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol

进入DubboProtocol.export(Invoker invoker)方法里面有个 openServer(url);

代码:

??private?void?openServer(URL?url)?{

????????//?find?server.

????????String?key?=?url.getAddress();

????????//client?也可以暴露一个只有server可以调用的服务。

????????boolean?isServer?=?url.getParameter(Constants.IS_SERVER_KEY,true);

????????if?(isServer)?{

????????ExchangeServer?server?=?serverMap.get(key);

????????if?(server?==?null)?{

????????serverMap.put(key,?createServer(url));?//createServer是创建服务

????????}?else?{

????????//server支持reset,配合override功能使用

????????server.reset(url);

????????}

????????}

????}

 

继续进入createServer,上源码

 

????private?ExchangeServer?createServer(URL?url)?{

????????//默认开启server关闭时发送readonly事件

????????url?=?url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,?Boolean.TRUE.toString());

????????//默认开启heartbeat

????????url?=?url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,?String.valueOf(Constants.DEFAULT_HEARTBEAT));

????????String?str?=?url.getParameter(Constants.SERVER_KEY,?Constants.DEFAULT_REMOTING_SERVER);

????????if?(str?!=?null?&&?str.length()?>?0?&&?!?ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))

????????????throw?new?RpcException("Unsupported?server?type:?"?+?str?+?",?url:?"?+?url);

????????url?=?url.addParameter(Constants.CODEC_KEY,?Version.isCompatibleVersion()???COMPATIBLE_CODEC_NAME?:?DubboCodec.NAME);

????????ExchangeServer?server;

????????try?{

????????????server?=?Exchangers.bind(url,?requestHandler);

????????}?catch?(RemotingException?e)?{

????????????throw?new?RpcException("Fail?to?start?server(url:?"?+?url?+?")?"?+?e.getMessage(),?e);

????????}

????????str?=?url.getParameter(Constants.CLIENT_KEY);

????????if?(str?!=?null?&&?str.length()?>?0)?{

????????????Set?supportedTypes?=?ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();

????????????if?(!supportedTypes.contains(str))?{

????????????????throw?new?RpcException("Unsupported?client?type:?"?+?str);

????????????}

????????}

????????return?server;

????}

 

? ? dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进行服务端server的创建,同上面的server = Exchangers.bind(url, requestHandler) 正式创建服务。

? ? 所以基本的创建步骤是

?? export() ?--> ?openServer() ?--> ?createServer() ?--> ?server = Exchangers.bind(url, requestHandler); ?

 

 

? 我们进行来看 Exchangers.bind(url, requestHandler)

 

? 源码:

public?static?ExchangeServer?bind(URL?url,?ExchangeHandler?handler)?throws?RemotingException?{

??????if?(url?==?null)?{

??????????throw?new?IllegalArgumentException("url?==?null");

??????}

??????if?(handler?==?null)?{

??????????throw?new?IllegalArgumentException("handler?==?null");

??????}

??????url?=?url.addParameterIfAbsent(Constants.CODEC_KEY,?"exchange");

??????return?getExchanger(url).bind(url,?handler);

??}

 

? 然后通过getExchanger(url).bind(url, handler)的bing进入 HeaderExchanger类

 

??public?ExchangeServer?bind(URL?url,?ExchangeHandler?handler)?throws?RemotingException?{

????????return?new?HeaderExchangeServer(Transporters.bind(url,?new?DecodeHandler(new?HeaderExchangeHandler(handler))));

????}

? ? 在进入Transporters类的bing的

 

??public?static?Server?bind(URL?url,?ChannelHandler...?handlers)?throws?RemotingException?{

?????????if?(url?==?null)?{

?????????????throw?new?IllegalArgumentException("url?==?null");

?????????}

?????????if?(handlers?==?null?||?handlers.length?==?0)?{

?????????????throw?new?IllegalArgumentException("handlers?==?null");

?????????}

?????????ChannelHandler?handler;

?????????if?(handlers.length?==?1)?{

?????????????handler?=?handlers[0];

?????????}?else?{

?????????????handler?=?new?ChannelHandlerDispatcher(handlers);

?????????}

?????????return?getTransporter().bind(url,?handler);

?????}

 

1240

 

通过bing可以知道他讲调用:GrizzlyTransporter,MinaTransporter,NettyTransporter 通过spi默认是调用NettyTransporter

? ? ?到这里我们基本明白dubbo的通讯默认是交给了netty来处理,

 

? ? ?我们在看下doOPen方法

 

@Override

????????protected?void?doOpen()?throws?Throwable?{

????????????NettyHelper.setNettyLoggerFactory();

????????????ExecutorService?boss?=?Executors.newCachedThreadPool(new?NamedThreadFactory("NettyServerBoss",?true));

????????????ExecutorService?worker?=?Executors.newCachedThreadPool(new?NamedThreadFactory("NettyServerWorker",?true));

????????????ChannelFactory?channelFactory?=?new?NioServerSocketChannelFactory(boss,?worker,?getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,?Constants.DEFAULT_IO_THREADS));

????????????bootstrap?=?new?ServerBootstrap(channelFactory);

 

????????????final?NettyHandler?nettyHandler?=?new?NettyHandler(getUrl(),?this);

????????????channels?=?nettyHandler.getChannels();

????????????//?https://issues.jboss.org/browse/NETTY-365

????????????//?https://issues.jboss.org/browse/NETTY-379

????????????//?final?Timer?timer?=?new?HashedWheelTimer(new?NamedThreadFactory("NettyIdleTimer",?true));

????????????bootstrap.setPipelineFactory(new?ChannelPipelineFactory()?{

????????????????public?ChannelPipeline?getPipeline()?{

????????????????????NettyCodecAdapter?adapter?=?new?NettyCodecAdapter(getCodec()?,getUrl(),?NettyServer.this);

????????????????????ChannelPipeline?pipeline?=?Channels.pipeline();

????????????????????/*int?idleTimeout?=?getIdleTimeout();

????????????????????if?(idleTimeout?>?10000)?{

????????????????????????pipeline.addLast("timer",?new?IdleStateHandler(timer,?idleTimeout?/?1000,?0,?0));

????????????????????}*/

????????????????????pipeline.addLast("decoder",?adapter.getDecoder());//解码

????????????????????pipeline.addLast("encoder",?adapter.getEncoder());//编码

????????????????????pipeline.addLast("handler",?nettyHandler);

????????????????????return?pipeline;

????????????????}

????????????});

????????????//?bind

????????????channel?=?bootstrap.bind(getBindAddress());

????????}

 

?了解netty的同学,肯定早已习惯这个方法的写法,就是创建了netty的server嘛,到这里dubbo的服务创建完毕了,这个时候控制台见打印:

?[DUBBO]?Start?NettyServer?bind?/0.0.0.0:20880,?export?/192.168.4.241:20880,?dubbo?version:?2.8.4,?current?host:?127.0.0.1