我们要根据不同的驱动包实现不同数据库的链接,下面看下mysql驱动的实现 **在META-INF/services/接口全路径文件 比如: **
如图: Extension 机制 如果@Adaptive 在类上面表示是一个自定义扩展点。 如果@Adaptive加在方法上面,会创建一个动态自适应扩展点。 包结构如下: com.alibaba.dubbo.common.extension factory:ExtensionFactory -> AdaptiveExtensionFactory -> SpiExtensionFactory -> SpringExtensionFactory Annotation : @Activate @Adaptive 适配器 @SPI ExtensionLoader加载扩展点类调用时序图: 首先我们分析下Extension 机制
Protocol protocol = ExtensionLoader. getExtensionLoader(Protocol.class). getAdaptiveExtension(); //Protocol$AdaptivegetExtensionLoader(Class clazz) 根据一个class类型获取ExtensionLoader
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) throw new IllegalArgumentException("Extension type == null"); if(!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } if(!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); //创建ExtensionLoader 对象并放入内存缓存中 loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }getAdaptiveExtension() 获得一个自适应扩展点
public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get();//获得一个Holder 实列,首次加载肯定为空。 if (instance == null) { //double check if(createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { //从内存缓存中获取自适应扩展点(单列模式) instance = cachedAdaptiveInstance.get(); if (instance == null) { try { //创建自适应扩展点。 instance = createAdaptiveExtension(); //dubbo中大量使用内存缓存。 cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; }createAdaptiveExtension() 创建自适应扩展点
private T createAdaptiveExtension() { try { //injectExtension 可以实现扩展点的依赖注入 return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e); } }.如果Adaptive在类层面则创建为默认自适应扩展点,代码如下
//.... 解析 META-INF/dubbo META-INF/service 配置文件 if (line.length() > 0) { Class<?> clazz = Class.forName(line, true, classLoader); //加载对应的实现类,并且判断实现类必须是当前的加载的扩展点的实现 if (!type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } //判断是否有自定义适配类,如果有,则在前面讲过的获取适配类的时候,直接返回当前的自定义适配类,不需要再动态创建 if (clazz.isAnnotationPresent(Adaptive.class)) { if (cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (!cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } }如果类层面没有 Adaptive注解,则看属性是否带有 type类型的构造函数,如果有,则认为是
try { //如果没有Adaptive注解,则判断当前类是否带有参数是type类型的构造函数,如果有,则认为是 //wrapper类。这个wrapper实际上就是对扩展类进行装饰. //可以在dubbo-rpc-api/internal下找到Protocol //分别是,filter/listene文件,发现Protocol配置了3个装饰r/mock. 所以Protocol这个实例来说,会增加对应的装饰器 clazz.getConstructor(type);// //得到带有public DubboProtocol(Protocol protocol)的扩展点。进行包装 Set<Class<?>> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet<Class<?>>(); wrappers = cachedWrapperClasses; }catch(){ ... }上述二者都不满足,则会动态创建字节码文件
//创建一个适配器扩展点。(创建一个动态的字节码文件) private Class<?> createAdaptiveExtensionClass() { //生成字节码代码 String code = createAdaptiveExtensionClassCode(); //获得类加载器 ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); //动态编译字节码 return compiler.compile(code, classLoader); }则生成动态的自适应扩展点为Protocol$Adaptive
public class Protocol$Adaptive implements Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void " + "com.alibaba.dubbo.rpc.Protocol.destroy() " + "of interface com.alibaba.dubbo.rpc.Protocol " + "is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort()" + " of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) " + "name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo": url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } }至此Extension机制分析完毕。
官方提供的流程图如下: 首先我们猜想下启动一个服务做了什么事情? 1.调用注册中心发布到zookeeper, 2.启动一个netty 服务监听服发布的端口。 由于dubbo是基于spring 进行发布的,spring 提供两个类去扩展自定义标签:NamespaceHandler 和 BeanDefinitionParser 约定首先加载META-INF/spring.handlers 文件 dubbo-config -> spring 文件解析入口 spring 加载init 方法
spring 将配置文件信息 解析成自己认识的类信息 从DubboBeanDefinitionParser.parser方法进入服务发布的入口ServiceBean
public class ServiceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean { }掉用ServiceConfig.export()方法:
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) { return; } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); } }延时不延时都会调用doExport方法
protected synchronized void doExport() { ....... //判断之前设置的属性是否为空,为空则重新设置。 checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } doExportUrls(); }调用 doExportUrls()方法
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true);//是不是获得注册中心的配置 ["registry:192.168.11.100:2181/com.....", "registry:192.168.11.100:6379/com.....",] for (ProtocolConfig protocolConfig : protocols) { //是不是支持多协议发布 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }调用 doExportUrlsFor1Protocol(protocolConfig, registryURLs)方法
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (name == null || name.length() == 0) { name = "dubbo"; } //将配置中的信息设置到Map 中 Map<String, String> map = new HashMap<String, String>(); if (anyhost) { map.put(Constants.ANYHOST_KEY, "true"); } map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } appendParameters(map, application); // 导出服务 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) //发布服务 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) //注册服务 if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) {// url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //通过proxyFactory来获取Invoker对象 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //注册服务 由于dubbo基于URL 驱动设计 因此protocal 为RegistryProtocol Exporter<?> exporter = protocol.export(invoker); //将exporter添加到list中 exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url);调用 RegistryProtocol.export(invoker)方法
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker 本地发布 启动一个netty服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); //得到需要注册到zk上的协议地址,也就是dubbo:// final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //注册服务 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }1.调用doLocalExport(final Invoker originInvoker)方法 开启一个netty 服务 调用DubboProtocol.export()
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); return exporter; }调用 openServer(url)
private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }调用dubboProtocol 的createServer(url) 方法 调用 HeaderExchanger.bind(URL url, ExchangeHandler handler)
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }调用Transporters.bind(url,newDecodeHandler(newHeaderExchangeHandler(handler)))
调用nettyTranport$Adpter.bind(URL url, ChannelHandler listener)
public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }NettyServer具体干了什么事??
至此 开启一个netty服务,监听发布端口
getRegistry具体做了什么事?
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // 锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //调用createRegistry(url) registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // 释放锁 LOCK.unlock(); } }调用createRegistry(url) 默认得到一个 ZookeeperRegistry 实列
public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }调用doRegister(url)方法 通过zookeeper 创建一个url
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }至此 服务发布过程结束,实质是做了两件事情 1.通过netty 启动了一个服务监听 2. 通过zookeeper 注册了一个协议地址