kafka是一款热门的消息缓存框架,它不是遵循jms规范,但其模型是生产者与消费者模型。因为它有优秀的高并发性能,处理速度惊人,使得它成为众多企业青睐的对象。在理论上,使用了pageCache和sendFile两种技术,使得其顺序IO的速度为100kb/s,而随机IO的速度是600mb/s。接下来,我们来搭建kafka集群。
首先,我们需要集群搭建zookeeper,然后启动zookeeper集群
其次,我们搭建kafka集群,完成kafka集群的启动
最后,测试kafka是否搭建成功
下载zookeeper安装包,地址: zookeeper-3.4.13
这里我使用的版本是3.4.13,各位看官按照自己的喜好下载解压zookeeper包,并修改名称
tar -zxvf zookeeper-3.4.13.tar.gz /usr/local mv /usr/local/zookeeper-3.4.13 /usr/local/zookeeper修改zookeeper配置信息
vi zoo.cfg //新建一个zoo.cfg文本 配置以下信息: # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/usr/local/zookeeper/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=hadoop1:2888:3888 server.1=hadoop2:2888:3888 server.2=hadoop3:2888:3888 server.3=hadoop4:2888:3888 注:我这里配置了4台zookeeper集群,其实建议配置奇数台,因为跟zookeeper的内部主从选举机制使用的算法相关。以上配置信息都有注释说明,看不懂的使用翻译工具凑合看哈哈~根据配置文件的存放的路径,在其目录下创建一个机器标识号
mkdir -p /usr/local/zookeeper/data cd myid vi myid //我这里使用了4台机器,分别标识为0,1,2,3,且不能重复,不然报错 如上图所示,只需要填写一个数字保存即可,但是机器与机器之间的标识码不能重复建议配置的路径在zookeeper安装目录中,以便分发到各台机器上。配置zookeeper的环境变量
vi /etc/profile export ZK_HOME=/usr/local/zookeeper export PARH=$PATH:$ZK_HOME/bin source /etc/profile分发配置好的zookeeper到各台机器上
scp -r /usr/local/zookeeper hadoop2:/usr/local/ scp -r /usr/local/zookeeper hadoop3:/usr/local/ scp -r /usr/local/zookeeper hadoop4:/usr/local/修改分发好的机器的myid,并配置环境变量,如:
我这里四台机器分别为hadoop1,hadoop2,hadoop3,hadoop4 hadoop1的myid为:0 hadoop2的myid为:1 hadoop3的myid为:2 hadoop4的myid为:3配置环境变量与上面步骤保持一致
在每台机器上启动zookeeper,执行命令:
zkServer.sh start查看启动之后的zookeeper状态,执行命令:
zkServer.sh status 集群机器中只有一台是leader,其余都是follower,如果leader节点挂了,又会进行重新选举如果每台机器都跟上图一样,则zookeeper搭建成功。然而,也会有失败的时候,原因很多,常见的可能就是配置错误,然后启动集群或者启动部分机器,而另一部分没启动,导致下一次全部启动出错。
最快速的解决方法:清理每台机器中zookeeper中的data目录,保留myid文件,其余都删除了,所有的机器都清理干净之后,再次启动。
非暴力解决方法:查看zookeeper的日志,定位出现的问题,分析原因然后在网上搜索资料解决
下载kafka安装包,地址: kafka_2.12-2.2.0
我这里安装的是2.12版本解压kafka安装包,并修改安装路径
cd /root //切换到下载目录 tar -zxvf kafka_2.12-2.2.0.tgz /usr/local mv /usr/local/kafka_2.12-2.2.0 /usr/local/kafka创建配置文件,填写配置信息
vi hadoop.properties 填写的配置信息如下: broker的唯一编号,不能重复 broker.id=0 #用来监听链接的端口,生存者和消费者将在此端口建立连接 port=9092 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的现成数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接受套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka运行日志存放的路径 log.dirs=/usr/local/kafka/logs #topic在当前broker上的分片个数 num.partitions=2 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #滚动生成新的segment文件的最大时间 log.roll.hours=168 #周期性检查文件大小时间 log.retention.check.interval.ms=300000 #日志清理是否打开 log.cleaner.enable=true zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181,hadoop4:2181 #zk连接超时时长 zookeeper.connection.toimeout.ms=6000 #partion buffer 消息的条数达到阈值,将触发flush到磁盘 log.flush.interval.messages=10000 #消息buffer的时间,达到阈值,将触发flush到磁盘 log.flush.interval.ms=3000 #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除 delete.topic.enable=true 本机的hostname host.name=hadoop1 本机的ip信息 advertised.host.name=192.168.1.101 #listeners=PLAINTEXT://hadoop1:9092 #advertised.listeners=PLAINTEXT://hadoop1:9092 配置文件有几处地方需要自己修改 broker.id :作为一个机器标识码,从0开始配置,且机器之间不能重复log.dirs : kafka存放日志的路径,需要自己去创建,建议创建在kafka安装目录中,方便机器分发zookeeper.connect : 配置zookeeper集群连接,根据自己需要配置host.name : 配置本机的主机名advertised.host.name : 配置本机的ip地址,消费者通过ip去拉取消息创建kafka存放日志的路径
mkdir -p /usr/local/kafka/logs配置kafka的环境变量
vi /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile分发配置好的kafka到其他机器上
scp -r /usr/local/kafka hadoop2:/usr/local scp -r /usr/local/kafka hadoop3:/usr/local scp -r /usr/local/kafka hadoop4:/usr/local每台机器按照上述步骤配置好环境变量,在配置文件中修改上述提及的本机ip相关信息
启动kafka集群,这里我们使用后台启动4个broker节点(hadoop1,hadoop2,hadoop3,hadoop4),每台都执行如下命令:
nohup kafka-server-start.sh /usr/local/kafka/config/hadoop.propertieskafka进程:
每台broker节点都启动之后,我们测试一下kafka是否能生产和消费消息
首先为kafka创建一个topic,相当于消费和生产需要的标识符,通过这个标识符来对接两方。
kafka-topics.sh --create --replication-factor 2 --partitions 1 --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181,hadoop4:2181 --topic test replication-factor指定broker的备份数量partitions 指定broker的分区数量,消息会平均存放到分区中zookeeper 指定zk的连接topic 指定topic的名称kafka-topics.sh --list --zookeeper hadoop1:2181 //查看主题
kafka-topics.sh --delete --zookeeper hadoop1:2181 --topic test //删除主题
现在任意一台机器上执行启动生产者客户端命令:
kafka-console-producer.sh --broker-list hadoop1:9092 --topic在任意一台机器上执行消费者客户端命令:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testproducer客户端,如图:
consumer客户端,如图: