比如我们通过spring 得到一个服务端对象,那么他是如何与服务端进行通讯的呢?
ApplicationContext context = new ClassPathXmlApplicationContext("/dubbo/user-clent.xml"); UserSerice userSerice = (UserService) context.getBean("userService"); User user = userSerice.getUserById(userId);上述代码两服务之间调用流程图如下:
从服务发布的代码中可以看出,会绑定一个handler,代码如下:
bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress());那么我们从NettyHandler入手其中有个NettyHandler.messageReceived方法,具体我们看它做了什么?
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }** handler.received(channel, e.getMessage())** 首先分析下handler 对象是什么?服务发布的时候,组装了一系列handller, 代码如下:
HeaderExchanger.bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }NettyServer.wrap了很多handler
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }经过一系列的包装,最终服务端的handler处理链为: ->MultiMessageHandler :复合消息处理 ->HeartbeatHandler : 心跳消息处理 ->AllChannelHandler: 业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理。 ->DecodeHandler: 消息解码处理 ->HeaderExchangeHandler ->ExchangeHandlerAdaptive
HeaderExchangeHandler 交互层请求响应处理,有三种处理方式
handlerRequest,双向请求handler.received 单向请求handleResponse 响应消息 代码如下: public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }我们看下双向处理 handleRequest(exchangeChannel, request);处理请求并返回response
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }调用DubboProtocol中定义的ExchangeHandlerAdaptive.replay方法处理消息
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { invoker.invoke(inv); }invoker.invoke(inv);其中 invoker 是那个对象呢?之前RegistryDirectory中发布本地服务的方法中,对invoker做的包装,通过InvokerDelegete对原本的invoker做了一层包装,而原本的invoker是什么呢?是一个JavassistProxyFactory生成的动态代理生成的。所以此处的invoker应该是Filter(Listener(InvokerDelegete(AbstractProxyInvoker (Wrapper.invokeMethod))) InvokerDelegete对原本的invoker进行包装,代码如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter; }** 通过以上分析可以帮助我们更加清楚的了解dubbo架构服务调用具体都做了些什么事情,结合官网给我们提供的架构图,望能给各位带来帮助**