在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装
1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup1: package kafka.server 2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { 3: private var server : KafkaServer = null 4: 5: private def init() { 6: server = new KafkaServer(serverConfig) 7: } 8: 9: def startup() { 10: try { 11: server.startup() 12: } 13: catch {...} 14: } 15: }
KafkaServer代表一个kafka broker, 这是kafka的核心. 只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧
1: package kafka.server 2: /** 3: * Represents the lifecycle of a single Kafka broker. Handles all functionality required 4: * to start up and shutdown a single Kafka node. 5: */ 6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { 7: var socketServer: SocketServer = null 8: var requestHandlerPool: KafkaRequestHandlerPool = null 9: var logManager: LogManager = null 10: var kafkaHealthcheck: KafkaHealthcheck = null 11: var topicConfigManager: TopicConfigManager = null 12: var replicaManager: ReplicaManager = null 13: var apis: KafkaApis = null 14: var kafkaController: KafkaController = null 15: val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) 16: var zkClient: ZkClient = null 17: 18: /** 19: * Start up API for bringing up a single instance of the Kafka server. 20: * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers 21: */ 22: def startup() { 23: /* start scheduler */ 24: kafkaScheduler.startup() 25: 26: /* setup zookeeper */ 27: zkClient = initZk() 28: 29: /* start log manager */ 30: logManager = createLogManager(zkClient) 31: logManager.startup() 32: 33: socketServer = new SocketServer(config.brokerId, 34: config.hostName, 35: config.port, 36: config.numNetworkThreads, 37: config.queuedMaxRequests, 38: config.socketSendBufferBytes, 39: config.socketReceiveBufferBytes, 40: config.socketRequestMaxBytes) 41: socketServer.startup() 42: 43: replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) 44: kafkaController = new KafkaController(config, zkClient) 45: 46: /* start processing requests */ 47: apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController) 48: requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) 49: 50: replicaManager.startup() 51: 52: kafkaController.startup() 53: 54: topicConfigManager = new TopicConfigManager(zkClient, logManager) 55: topicConfigManager.startup() 56: 57: /* tell everyone we are alive */ 58: kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) 59: kafkaHealthcheck.startup() 60: }2.1 KafkaScheduler
KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现
1: package kafka.utils 2: 3: /** 4: * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor 5: * 6: * It has a pool of kafka-scheduler- threads that do the actual work. 7: * 8: * @param threads The number of threads in the thread pool 9: * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it. 10: * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown. 11: */ 12: @threadsafe 13: class KafkaScheduler(val threads: Int, 14: val threadNamePrefix: String = "kafka-scheduler-", 15: daemon: Boolean = true) extends Scheduler with Logging { 16: @volatile private var executor: ScheduledThreadPoolExecutor = null 17: override def startup() { 18: this synchronized { 19: executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor 20: executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) 21: executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) 22: executor.setThreadFactory(new ThreadFactory() { 23: def newThread(runnable: Runnable): Thread = 24: Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) 25: }) 26: } 27: } 28: 29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = { 30: val runnable = new Runnable { //将fun封装成Runnable 31: def run() = { 32: try { 33: fun() 34: } catch {...} 35: finally {...} 36: } 37: } 38: if(period >= 0) //在pool中进行delay schedule 39: executor.scheduleAtFixedRate(runnable, delay, period, unit) 40: else 41: executor.schedule(runnable, delay, unit) 42: }2.2 Zookeeper Client
由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
2.3 logManager
The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. Apache Kafka源码分析 – Log Management
2.4 ReplicaManager
在0.8中新加入的replica相关模块
Apache Kafka Replication Design – High levelkafka Detailed Replication Design V3Apache Kafka源码分析 – ReplicaManager
2.5 Kafka Socket Server
首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的
socketServer = new SocketServer(config.brokerId...)KafkaApis 该类封装了所有request的处理逻辑
KafkaRequestHandler
2.6 offsetManager
offsetManager = createOffsetManager() 定期清除过期的offset数据,即compact操作,
scheduler.schedule(name = "offsets-cache-compactor", fun = compact, period = config.offsetsRetentionCheckIntervalMs, unit = TimeUnit.MILLISECONDS)以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer
2.7 KafkaController
kafkaController = new KafkaController(config, zkClient, brokerState)Apache Kafka源码分析 – Controller
0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性
2.8 TopicConfigManager
topicConfigManager = new TopicConfigManager(zkClient, logManager)TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000比如你可以这样设置,那么这些topic config如何生效的?
topic-level config默认是被存储在,
/brokers/topics/<topic_name>/config 但是topic很多的情况下,为了避免创建太多的watcher,所以单独创建一个目录
/brokers/config_changes来触发配置的变化 所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化
/brokers/config_changes/config_change_13321并且这个通知有个suffix,用于区别是否已处理过
/** * Process the given list of config changes */ private def processConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { info("Processing config change notification(s)...") val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { //未处理过 val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) if(jsonOpt.isDefined) { val json = jsonOpt.get val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic)) log.config = logConfig //真正的更新log配置 info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟 } } lastExecutedChange = changeId } } } } 这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置
2.8 KafkaHealthcheck
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)这个很简单,就像注释的,告诉所有人我还活着。。。
实现就是在,
/brokers/[0...N] --> advertisedHost:advertisedPortregister一个ephemeral znode,当SessionExpired时,再去register,典型zk应用 所以只需要watch这个路径就是知道broker是否还活着
2.9 ContolledShutdown
对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了
但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown
private def controlledShutdown()挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown
本文章摘自博客园,原文发布日期: 2014-02-14
相关资源:敏捷开发V1.0.pptx