首先看下官网给我们提供消费调用初始化的时序图
消费端的代码解析是从下面这段配置开始的
<dubbo:reference id="xxxService" interface="xxx.xxx.Service"/>具体代码如下:
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //默认情况下如果本地有服务暴露,则引用本地服务. isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 通过注册中心配置拼装URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最后一个registry url } } if (registryURL != null) { // 有 注册中心协议的URL // 对有注册中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 注册中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && ! invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // 创建服务代理 return (T) proxyFactory.getProxy(invoker); }通过调用refprotocol.refer方法获得。 而refprotocol是一个什么样的对象,由下面的代得知refprotocol是一个自适应扩展点,是一个动态生成的Protocol$Adaptive。
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();Protocol$Adaptive源码如下
public Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); }根据当前协议的Url得知协议的地址为 url=“registry://” 因此 得到的自适应扩展点为:RegistryProtocol
RegistryProtocol.refer()代码如下
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //下面的方法作用是将url为registry://192.154.1.20设置成 zookerper:://192.154.1.20// 由于 可以得出 value 为zookerper url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); //得到一个Zookekeperreristry Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }com.alibaba.dubbo.registry.integration.RegistryProtocol#doRefer() 代码具体如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); //zookerperreristry directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { //会把consummer 地址注册到注册中心 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory); }directory.subscribe() 具体干了什么事情?
public void subscribe(URL url) { setConsumerUrl(url); registry.subscribe(url, this); // registry.=zookeeperreristry this=RegistryDirectory }因为 ZookeeperRegistry并没有实现subscribe方法,因此调用父类的subscribe方法 com.alibaba.dubbo.registry.support.FailbackRegistry#subscribe
public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服务器端发送订阅请求 doSubscribe(url, listener); 调用子类的doSubscribe方法 } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 将失败的订阅请求记录到失败列表,定时重试 addFailedSubscribed(url, listener); } }因此调用com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe方法其主要做了的是
对providers/routers/configurator三个节点进行创建和监听 List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); 调用notify(url,listener,urls) 将已经可用的列表进行通知主要是doNotify(url, listener, urls)做事情; 循环providers/routers/configurator三个目录下的地址信息并保存到本地。 //对providers/routers/configurator 路径下进行notify() //第一次取地址,第二次监听做变更。 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); }listener.notify(categoryList); 实则调用com.alibaba.dubbo.registry.integration.RegistryDirectory#notify方法
public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && configuratorUrls.size() >0 ){ this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && routerUrls.size() >0 ){ List<Router> routers = toRouters(routerUrls); if(routers != null){ // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // 合并override参数 this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); }调用com.alibaba.dubbo.registry.integration.RegistryDirectory#refreshInvoker方法
/** * 根据invokerURL列表转换为invoker列表。转换规则如下: * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用 * 2.如果传入的invoker列表不为空,则表示最新的invoker列表 * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。 * @param invokerUrls 传入的参数不能为null */ private void refreshInvoker(List<URL> invokerUrls){ if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止访问 this.methodInvokerMap = null; // 置空列表 destroyAllInvokers(); // 关闭所有Invoker } else { this.forbidden = false; // 允许访问 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){ invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比 } if (invokerUrls.size() ==0 ){ return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表 将dubbo:// 转换为 DubboInvoker Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表 // state change //如果计算错误,则不进行处理. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){ logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString())); return ; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try{ destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker }catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }将urls转成invokers,如果url已经被refer过,不再重新引用。
/** * * @param urls * @param overrides * @param query * @return invokers */ private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); if(urls == null || urls.size() == 0){ return newUrlInvokerMap; } Set<String> keys = new HashSet<String>(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //如果reference端配置了protocol,则只选择匹配的protocol if (queryProtocols != null && queryProtocols.length() >0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL参数是排序的 if (keys.contains(key)) { // 重复URL continue; } keys.add(key); // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 缓存中没有,重新refer try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = ! url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t); } if (invoker != null) { // 将新的引用放入缓存 newUrlInvokerMap.put(key, invoker); } }else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }invoker = new InvokerDelegete(protocol.refer(serviceType, url), url, providerUrl); protocol 为动态生产的适配器 DubboProtocol 因此调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#refer方法 得到一个DubboInvoker
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }getClients(url)做了什么事情? 创建一个新的链接 通过nettyClient 与服务端进行通讯 通过Exchangers.connect(url ,requestHandler);。
private ExchangeClient[] getClients(URL url){ //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } //最终调用 public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); }上述代码中 doRefer(cluster, registry, type, url); 有个cluster 对象,我们看下它是如何定义的
//首先 Cluster 类的定义为: @SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; } public class RegistryProtocol implements Protocol { 由此得出 cluster 是一个动态注入的扩展点,而且是一个动态生成字节码的自适应适配器Cluster$Adaptive private Cluster cluster; public void setCluster(Cluster cluster) { this.cluster = cluster; } }动态生成Cluster$Adaptive类代码如下:
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }**这里的Cluster$Adaptive也并不单纯, dubbo 配置文件如下,在不配置cluster属性的情况下 默认值为failover,因此通过上述ClusterAdaptive 得到的扩展点为FailoverCluster **
<dubbo:reference interface="com.foo.BarService" check="false" cluster="failover" />在扩展点配置中可以看出 FailoverCluster并不是单纯的扩展点是经过MockClusterWrapper包装过的,如果这个扩展点存在一个构造函数,并且构造函数就是扩展接口本身,那么这个扩展点就会这个wrapper装饰,而Cluster被装饰的是:MockClusterWrapper
public class MockClusterWrapper implements Cluster { //满足如果这个扩展点存在一个构造函数,并且构造函数就是扩展接口本身 private Cluster cluster; public MockClusterWrapper(Cluster cluster) { this.cluster = cluster; } public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } }cluster.join(directory); 实则先调用 com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper#join方法然后再调用FailoverCluster.join方法。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); }doRefer具体做了那几件事?
将consumer://协议地址注册到注册中心订阅zookeeper地址的变化建立回话链接默认基于Netty 实现。创建invoker 对象调用cluster.join()方法返回invoker 对象proxyFactory是生成一个动态的自适应适配器ProxyFactory$Adaptive然后调用这个适配器中的getProxy方法,代码如下
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); }由代码得知 又是通过javassist实现的一个动态代理,由于符合装饰wapper 模式 因此结构为 StubProxyFactoryWrapper(JavassistProxyFactory),最后调用JavassistProxyFactory.getproxy代码如下:
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }Proxy.getProxy(interfaces),动态生成的字节码为
public java.lang.String sayHello(java.lang.String arg0){ Object[] args = new Object[1]; args[0] = ($w)$1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String)ret; }上述代码中 handler.invoke(this, methods[0], args)就是在JavassistProxyFactory.getProxy中。传递的new InvokerInvocationHandler(invoker)
我们通常使用上述代码调用服务端实则是dubbo 底层给我们动态创建了一个代理对象$Proxy0@2556,