KafkaProducer【java版】

    xiaoxiao2024-12-23  60

    前置:

    启动zk进程和kafka进程 zkServer.sh start nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties & 使用tail -F nohup.out查看启动的日志

    1.创建一个topic

    $KAFKA_HOME/bin/kafka-topics.sh -create -zookeeper spark001:2181 -replication-factor 1 -partitions 1 -topic lgoffset

    ps:topic 查看 kafka-topics.sh -list -zookeeper spark001:2181

    2.先使用控制台测试,Kafka是否正常工作 控制台生产者:

    kafka-console-producer.sh --broker-list spark001:9092 --topic lgoffset

    控制台消费者:

    kafka-console-consumer.sh --zookeeper spark001:2181 --topic lgoffset

    这个时候,生产者生产数据,消费者能够流出数据就正常。

    保证了这一步,你就可以做很多测试了。比如生产者可以是flume采集的日志,也可以是java 代码。

    KafkaProducerV2 【kafka-0-10_2.11版本】

    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.0</version> </dependency>

    java代码如下:

    import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Description: kafka-0-10_2.11 * * @Author: 留歌36 * @Date: 2019/5/26 12:08 */ public class KafkaProducerV2 { public static void main(String[] args) { Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("bootstrap.servers", "192.168.1.103:9092"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++){ producer.send( new ProducerRecord<String, String>( "test", Integer.toString(i), "留歌------------"+Integer.toString(i) ) ); } System.out.println("==================留歌生产数据完毕================"); } }

    运行上面的代码,就能够在 kafka-console-consumer 看见生产的消息了。 这里简单说明,不同版本的Kafka可能需要的参数是不一样的。

    参照官网: https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

    KafkaProducerV1 【kafka-0-8_2.11版本】

    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency>

    java代码如下:

    import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.io.Serializable; import java.util.Properties; import java.util.UUID; /** * Description: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version>2.4.0 </dependency> * * @Author: 留歌36 * @Date: 2019/5/26 12:34 */ public class KafkaProducerV1 implements Serializable { public static void main(String[] args) { Properties properties = new Properties(); properties.put("serializer.class","kafka.serializer.StringEncoder"); properties.put("metadata.broker.list","192.168.1.103:9092"); properties.put("request.required.acks","1"); ProducerConfig producerConfig = new ProducerConfig(properties); Producer<String,String> producer = new Producer<String, String>(producerConfig); String topic = "test"; for (int i = 0; i < 100; i++) { producer.send(new KeyedMessage<String, String>(topic, i+"", "测试数据:" + UUID.randomUUID())); } System.out.println("==================留歌生产数据完毕V0.8================"); } }

    两个版本的都是ok的

    最新回复(0)