flume启动源码分析

    xiaoxiao2021-04-15  280

    flume启动代码分析记录

    具体结合如下配置文件进行讲解,配置文件如下所示:

    agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1 agent1.sources.source1.type = TAILDIR agent1.sources.source1.channels = channel1 agent1.sources.source1.channels.skipToEnd = True agent1.sources.source1.positionFile = /home/flume/data/taildir_position.json agent1.sources.source1.filegroups = f1 agent1.sources.source1.filegroups.f1 = /home/flume/test/logs/abc.log agent1.sources.source1.headers.f1.headerKey1 = value1 agent1.sources.source1.fileHeader = true agent1.channels.channel1.type=memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapacity = 1000 agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sink1.brokerList=a:9092,b:9092,c:9092 agent1.sinks.sink1.topic=connect-test-one agent1.sinks.sink1.requiredAcks = 1 agent1.sinks.sink1.batchSize = 20 agent1.sinks.sink1.channel = channel1

    1.Application.java–>main方法: 先根据命令行内容加载相应的文件类别:

    try { boolean isZkConfigured = false; Options options = new Options(); Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); option.setRequired(false); options.addOption(option); option = new Option(null, "no-reload-conf", false, "do not reload config file if changed"); options.addOption(option); // Options for Zookeeper option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)"); option.setRequired(false); options.addOption(option); option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs"); option.setRequired(false); options.addOption(option); option = new Option("h", "help", false, "display help text"); options.addOption(option); CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); ...略

    此处仅是agent配置信息的读取,不做介绍。接下来才是具体解析配置文件的过程:

    ...if (reload) { //111 EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { //2222 PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); } } application.start(); final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } }); } catch (Exception e) { logger.error("A fatal error occurred while running. Exception follows.", e); } ...

    //111此处的reload是boolean类型,用来判断是否动态加载配置文件的标示(间隔30s),如果设置为true,则设计EventBus块的内容,详情见博文[Guava学习笔记:EventBus] 接下来主要讲解不循环加载配置文件的类型,也就是代码else处的代码//2222 此处利用

    PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile);

    生成了configurationProvider对象,主要主要生成下面几个对象

    public AbstractConfigurationProvider(String agentName) { super(); this.agentName = agentName; this.sourceFactory = new DefaultSourceFactory(); this.sinkFactory = new DefaultSinkFactory(); this.channelFactory = new DefaultChannelFactory(); channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>(); }

    然后主要利用:

    application.handleConfigurationEvent(configurationProvider.getConfiguration());

    加载配置文件及形成components,接下来主要围绕这行代码进行讲解

    public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); // getFlumeConfiguration()方法,是关键核心,负责整个配置加载,下边代码说明 FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }

    new FlumeConfiguration(toMap(properties)),代码在下边:

    //读取配置文件并转换为map,从而转换为FlumeConfiguration对象 @Override public FlumeConfiguration getFlumeConfiguration() { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); String resolverClassName = System.getProperty("propertiesImplementation", DEFAULT_PROPERTIES_IMPLEMENTATION); Class<? extends Properties> propsclass = Class.forName(resolverClassName) .asSubclass(Properties.class); Properties properties = propsclass.newInstance(); properties.load(reader); return new FlumeConfiguration(toMap(properties)); } catch (IOException ex) { LOGGER.error("Unable to load file:" + file + " (I/O failure) - Exception follows.", ex); } catch (ClassNotFoundException e) { LOGGER.error("Configuration resolver class not found", e); } catch (InstantiationException e) { LOGGER.error("Instantiation exception", e); } catch (IllegalAccessException e) { LOGGER.error("Illegal access exception", e); } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { LOGGER.warn( "Unable to close file reader for file: " + file, ex); } } } return new FlumeConfiguration(new HashMap<String, String>()); }

    new FlumeConfiguration(Map<String, String> properties)的代码如下:

    /** * Creates a populated Flume Configuration object. */ public FlumeConfiguration(Map<String, String> properties) { agentConfigMap = new HashMap<String, AgentConfiguration>(); errors = new LinkedList<FlumeConfigurationError>(); // Construct the in-memory component hierarchy for (String name : properties.keySet()) { String value = properties.get(name); // addRawProperty里对agentConfigMap初始化, // 1:这里插入的是agentConfiguration对象里的contextMap if (!addRawProperty(name, value)) { logger.warn("Configuration property ignored: " + name + " = " + value); } } // Now iterate thru the agentContext and create agent configs and add them // to agentConfigMap //这里插入的是agentConfiguration对象里的configMap // validate and remove improperly configured components //2 validateConfiguration(); }

    上面代码最重要的有两处,分别为1处的addRawProperty和2处的validateConfiguration,接下来分别进行分析

    进入FlumeConfiguration.addRawProperty(name,value): private boolean addRawProperty(String name, String value) { // Null names and values not supported if (name == null || value == null) { errors .add(new FlumeConfigurationError("", "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } // Empty values are not supported if (value.trim().length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); return false; } // Remove leading and trailing spaces name = name.trim(); value = value.trim(); int index = name.indexOf('.');//获得每一个key的第一个`.`处的索引 // All configuration keys must have a prefix defined as agent name if (index == -1) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } String agentName = name.substring(0, index);//根据`.`切割出来agentName的名称。 // Agent name must be specified for all properties if (agentName.length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } String configKey = name.substring(index + 1);//333然后将所有的key值都将agentName切分出去 // Configuration key must be specified for every property if (configKey.length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.PROPERTY_NAME_NULL, ErrorOrWarning.ERROR)); return false; } AgentConfiguration aconf = agentConfigMap.get(agentName); // 这里创建AgentConfiguration,并插入到FlumeConfiguration的Map<String, AgentConfiguration> agentConfigMap中。此处的AgentConfiguration对象就是该agentName对应的所有agent的配置信息。具体可深入看下 if (aconf == null) { aconf = new AgentConfiguration(agentName, errors); agentConfigMap.put(agentName, aconf); } // Each configuration key must begin with one of the three prefixes: // sources, sinks, or channels. // 最终,键值对被加载到agentConfiguration中,此处是最重要的。 return aconf.addProperty(configKey, value); }

    经过切分后的//33处的配置文件应该变为:

    //这里应该是map形式的,按`=`分割。 sources=source1 channels=channel1 sinks=sink1 sources.source1.type = TAILDIR sources.source1.channels = channel1 sources.source1.channels.skipToEnd = True sources.source1.positionFile = /home/flume/data/taildir_position.json sources.source1.filegroups = f1 sources.source1.filegroups.f1 = /home/flume/test/logs/abc.log sources.source1.headers.f1.headerKey1 = value1 sources.source1.fileHeader = true channels.channel1.type=memory channels.channel1.capacity = 1000 channels.channel1.transactionCapacity = 1000 sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink sinks.sink1.brokerList=a:9092,b:9092,c:9092 sinks.sink1.topic=connect-test-one sinks.sink1.requiredAcks = 1 sinks.sink1.batchSize = 20 sinks.sink1.channel = channel1

    之后的代码如下:

    private boolean addProperty(String key, String value) { // Check for configFilters if (CONFIG_CONFIGFILTERS.equals(key)) { if (configFilters == null) { configFilters = value; return true; } else { LOGGER.warn("Duplicate configfilter list specified for agent: {}", agentName); addError(CONFIG_CONFIGFILTERS, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for sources if (CONFIG_SOURCES.equals(key)) { if (sources == null) { sources = value;//获取sources的值 return true; } else { LOGGER.warn("Duplicate source list specified for agent: {}", agentName); addError(CONFIG_SOURCES, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for sinks if (CONFIG_SINKS.equals(key)) { if (sinks == null) { sinks = value;//获取sinks的值 LOGGER.info("Added sinks: {} Agent: {}", sinks, agentName); return true; } else { LOGGER.warn("Duplicate sink list specfied for agent: {}", agentName); addError(CONFIG_SINKS, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for channels if (CONFIG_CHANNELS.equals(key)) { if (channels == null) { channels = value;//获取channels的值 return true; } else { LOGGER.warn("Duplicate channel list specified for agent: {}", agentName); addError(CONFIG_CHANNELS, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for sinkgroups if (CONFIG_SINKGROUPS.equals(key)) { if (sinkgroups == null) { sinkgroups = value;//获取消费组的值 return true; } else { LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName); addError(CONFIG_SINKGROUPS, DUPLICATE_PROPERTY, ERROR); return false; } } //重点在这*******,其中的五个方法分别对各自的compoent进行配置的读取, if (addAsSourceConfig(key, value) || addAsChannelValue(key, value) || addAsSinkConfig(key, value) || addAsSinkGroupConfig(key, value) || addAsConfigFilterConfig(key, value) ) { return true; } LOGGER.warn("Invalid property specified: {}", key); addError(key, INVALID_PROPERTY, ERROR); return false; }

    此处以addAsSourceConfig(key, value)为例来说明,代码如下:

    //此处代码从上向下依次深入,可以从下向上看。 //1 private boolean addAsSourceConfig(String key, String value) { return addComponentConfig( key, value, CONFIG_SOURCES_PREFIX, sourceContextMap ); } --->//2 private boolean addComponentConfig( String key, String value, String configPrefix, Map<String, Context> contextMap ) { ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix); if (parsed != null) { String name = parsed.getComponentName().trim();//获得sourc等组件的名称 LOGGER.info("Processing:{}", name); Context context = contextMap.get(name); if (context == null) { LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey()); context = new Context(); contextMap.put(name, context);//将组件名及key后面的值存到source的Context上下文中。 } context.put(parsed.getConfigKey(), value);//将组件信息及 return true; } return false; } --->//3 private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) { //例如<sources.source1.type , TAILDIR> // key must start with prefix if (!key.startsWith(prefix)) {//key:sources.source1.type,是以sources.开头的 return null; } // key must have a component name part after the prefix of the format: // <prefix><component-name>.<config-key> int index = key.indexOf('.', prefix.length() + 1); if (index == -1) { return null; } String name = key.substring(prefix.length(), index);//将source1从key中切出来->source.name String configKey = key.substring(prefix.length() + name.length() + 1);//configKey就是type了 // name and config key must be non-empty if (name.isEmpty() || configKey.isEmpty()) { return null; } return new ComponentNameAndConfigKey(name, configKey);//然后将其组成ComponentNameAndConfigKey对象 } -->//4 private ComponentNameAndConfigKey(String name, String configKey) { this.componentName = name; this.configKey = configKey; }

    然后又回到//2处的if (parsed != null)处了。进入深入。

    所以最后是加载到AgentConfiguration中对应的*ContextMap中了;

    最后在代码中还有一处 // validate and remove improperly configured components validateConfiguration();

    对应详细代码有:

    private void validateConfiguration() { Iterator<String> it = agentConfigMap.keySet().iterator(); while (it.hasNext()) { String agentName = it.next(); AgentConfiguration aconf = agentConfigMap.get(agentName); if (!aconf.isValid()) { logger.warn("Agent configuration invalid for agent '" + agentName + "'. It will be removed."); errors.add(new FlumeConfigurationError(agentName, "", FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID, ErrorOrWarning.ERROR)); it.remove(); } logger.debug("Channels:" + aconf.channels + "\n"); logger.debug("Sinks " + aconf.sinks + "\n"); logger.debug("Sources " + aconf.sources + "\n"); } logger.info("Post-validation flume configuration contains configuration" + " for agents: " + agentConfigMap.keySet()); }

    aconf.isValid()

    private boolean isValid() { logger.debug("Starting validation of configuration for agent: {}", agentName); if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { logger.debug("Initial configuration: {}", this.getPrevalidationConfig()); } // Make sure that at least one channel is specified if (channels == null || channels.trim().length() == 0) { logger.warn("Agent configuration for '" + agentName + "' does not contain any channels. Marking it as invalid."); errorList.add(new FlumeConfigurationError(agentName, BasicConfigurationConstants.CONFIG_CHANNELS, FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); return false; } channelSet = new HashSet<String>(Arrays .asList(channels.split("\\s+"))); // validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL, // ATTR_TYPE); channelSet = validateChannels(channelSet); if (channelSet.size() == 0) { logger.warn("Agent configuration for '" + agentName + "' does not contain any valid channels. Marking it as invalid."); errorList.add(new FlumeConfigurationError(agentName, BasicConfigurationConstants.CONFIG_CHANNELS, FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); return false; } sourceSet = validateSources(channelSet); sinkSet = validateSinks(channelSet); sinkgroupSet = validateGroups(sinkSet); // If no sources or sinks are present, then this is invalid if (sourceSet.size() == 0 && sinkSet.size() == 0) { logger.warn("Agent configuration for '" + agentName + "' has no sources or sinks. Will be marked invalid."); errorList.add(new FlumeConfigurationError(agentName, BasicConfigurationConstants.CONFIG_SOURCES, FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); errorList.add(new FlumeConfigurationError(agentName, BasicConfigurationConstants.CONFIG_SINKS, FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); return false; } // Now rewrite the sources/sinks/channels this.sources = getSpaceDelimitedList(sourceSet); this.channels = getSpaceDelimitedList(channelSet); this.sinks = getSpaceDelimitedList(sinkSet); this.sinkgroups = getSpaceDelimitedList(sinkgroupSet); if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { logger.debug("Post validation configuration for {}", agentName); logger.debug(this.getPostvalidationConfig()); } return true; }

    validateChannels

    private Set<String> validateChannels(Set<String> channelSet) { Iterator<String> iter = channelSet.iterator(); Map<String, Context> newContextMap = new HashMap<String, Context>(); ChannelConfiguration conf = null; while (iter.hasNext()) { String channelName = iter.next(); Context channelContext = channelContextMap.get(channelName); // Context exists in map. if (channelContext != null) { // Get the configuration object for the channel: ChannelType chType = getKnownChannel(channelContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); boolean configSpecified = false; String config = null; // Not a known channel - cannot do specific validation to this channel if (chType == null) { config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { configSpecified = true; } } else { config = chType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } try { //建立channel组件 conf = (ChannelConfiguration) ComponentConfigurationFactory.create( channelName, config, ComponentType.CHANNEL); logger.debug("Created channel " + channelName); if (conf != null) { //配置组件信息 conf.configure(channelContext); } if ((configSpecified && conf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(channelName, channelContext); } else if (configSpecified) { channelConfigMap.put(channelName, conf); } if (conf != null) { errorList.addAll(conf.getErrors()); } } catch (ConfigurationException e) { // Could not configure channel - skip it. // No need to add to error list - already added before exception is // thrown if (conf != null) errorList.addAll(conf.getErrors()); iter.remove(); logger.warn("Could not configure channel " + channelName + " due to: " + e.getMessage(), e); } } else { iter.remove(); errorList.add(new FlumeConfigurationError(agentName, channelName, FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); } } channelContextMap = newContextMap; Set<String> tempchannelSet = new HashSet<String>(); tempchannelSet.addAll(channelConfigMap.keySet()); tempchannelSet.addAll(channelContextMap.keySet()); channelSet.retainAll(tempchannelSet); return channelSet; }

    至此,一个完整的FlumeConfiguration对象已经完全解析好了;下面继续; 接下来开始创建组件了

    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());//得到对应agent的所有组件的信息 if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } }

    此处以channel为例说明问题。

    private void loadChannels(AgentConfiguration agentConf, Map<String, ChannelComponent> channelComponentMap) throws InstantiationException { LOGGER.info("Creating channels"); ListMultimap<Class<? extends Channel>, String> channelsNotReused = ArrayListMultimap.create(); // assume all channels will not be re-used for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channelCache.entrySet()) { Class<? extends Channel> channelKlass = entry.getKey(); Set<String> channelNames = entry.getValue().keySet(); channelsNotReused.get(channelKlass).addAll(channelNames); } Set<String> channelNames = agentConf.getChannelSet(); Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap(); /* * Components which have a ComponentConfiguration object */ for (String chName : channelNames) { ComponentConfiguration comp = compMap.get(chName); if (comp != null) { Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType()); try { Configurables.configure(channel, comp); channelComponentMap.put(comp.getComponentName(), new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } /* * Components which DO NOT have a ComponentConfiguration object * and use only Context */ for (String chName : channelNames) { Context context = agentConf.getChannelContext().get(chName); if (context != null) { Channel channel = getOrCreateChannel(channelsNotReused, chName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(channel, context); channelComponentMap.put(chName, new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } /* * Any channel which was not re-used, will have it's reference removed */ for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) { Map<String, Channel> channelMap = channelCache.get(channelKlass); if (channelMap != null) { for (String channelName : channelsNotReused.get(channelKlass)) { if (channelMap.remove(channelName) != null) { LOGGER.info("Removed {} of type {}", channelName, channelKlass); } } if (channelMap.isEmpty()) { channelCache.remove(channelKlass); } } } }

    上述代码是创建channel的。

    参看:

    https://www.cnblogs.com/aquariusm/p/6118976.html


    最新回复(0)