Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。 注意:Kafka并没有遵循JMS规范,它只提供了发布和订阅通讯方式。 kafka中文官网:http://kafka.apachecn.org/quickstart.html
1.日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。 2.消息系统:解耦和生产者和消费者、缓存消息等。 3.用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。 4.运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 5.流式处理:比如spark streaming和storm 6.事件源
1.kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。 2.每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。 3.每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。 4.发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
1.JDK安装 2.zookeeper集群搭建,可以参考我的另外博客:https://blog.csdn.net/dsen726/article/details/89883920 注意:记得关闭防火墙
服务器ip名称192.168.8.63broker0(zk + kafka)192.168.8.153broker1(zk + kafka)192.168.8.110broker2(zk + kafka)3台虚拟机均进行以下操作: 命令1:
cd /usr/local命令2:
wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz命令3:
tar -zxvf kafka_2.11-1.0.0.tgz命令4:
mv kafka_2.12-0.11.0.0 kafka命令5:修改配置文件
vi ./kafka/config/server.propertiesbroker0 需要修改的内容如下(192.168.8.63)
broker.id=0 listeners=PLAINTEXT://192.168.8.63:9092 zookeeper.connect=192.168.8.63:2181,192.168.8.153:2181,192.168.8.110:2181broker1 需要修改的内容如下(192.168.8.153)
broker.id=1 listeners=PLAINTEXT://192.168.8.153:9092 zookeeper.connect=192.168.8.63:2181,192.168.8.153:2181,192.168.8.110:2181broker2 需要修改的内容如下(192.168.8.110)
broker.id=2 listeners=PLAINTEXT://192.168.8.110:9092 zookeeper.connect=192.168.8.63:2181,192.168.8.153:2181,192.168.8.110:2181命令6: // 在系统环境中配置kafka的路径
vi /etc/profile// 在文件最下方添加kafka路径,多路径PATH写法为PATH= Z O O K E E P E R H O M E / b i n : {ZOOKEEPER_HOME}/bin: ZOOKEEPERHOME/bin:{KAFKA_HOME}/bin:$PATH
export KAFKA_HOME=/usr/local/kafka PATH=${KAFKA_HOME}/bin:$PATH export PATH命令7:使修改完的环境变量生效
source /etc/profile命令1:开启3台虚拟机的zookeeper程序
/usr/local/zookeeper/bin/zkServer.sh start命令2:开启成功后查看zookeeper集群的状态
/usr/local/zookeeper/bin/zkServer.sh status出现Mode:follower或是Mode:leader则代表成功 命令3:在后台开启3台虚拟机的kafka程序(cd /usr/local/kafka)
./bin/kafka-server-start.sh -daemon config/server.properties命令4:在其中一台虚拟机(192.168.8.110)创建topic
/usr/local/kafka/bin/kafka-topics.sh –create –zookeeper 192.168.8.110:2181 –replication-factor 3 –partitions 3 –topic test命令5:查看创建的topic信息
/usr/local/kafka/bin/kafka-topics.sh –describe –zookeeper 192.168.131.130:2181 –topic my-replicated-topic命令6:在其中一台虚拟机(192.168.8.110)发消息(生产)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.8.110:9092 --topic test命令7:(消费)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.153:9092 --topic test --from-beginning