《Flink官方文档》监控Wikipedia 编辑流(二)

    xiaoxiao2024-03-11  149

    本示例应该能让你开始写自己的Flink程序。为了学习更多,你可以打开我们的基本概念和DataStream API的指南。如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习。

    激励练习:在一个Flink集群上运行,并将结果写入Kafka

    请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续。

    第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖。将这个添加的pom.xml文件的依赖模块中:

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version> </dependency>

    接下来,我们需要修改我们的程序。我们会移除print(),替换成使用Kafka 接收器。新的代码示例如下:

    result .map(new MapFunction<Tuple2<String,Long>, String>() { @Override public String map(Tuple2<String, Long> tuple) { return tuple.toString(); } }) .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

    也需要引入相关的类:

    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.api.common.functions.MapFunction;

    注意我们首先是如何使用一个MapFunction将Tuple2<String, Long>的流转换成一个字符串流的。我们就是在做这个,因为这个对写入简单的字符串到Kafka更容易。因而,我们创建了一个Kafka接收器。你肯能要适配一下你设置的主机名和端口号。”wiki-result”是运行我们程序之前我们将会创建的Kafka 流的名字。因为我们需要一个在集群上运行的jar文件,故用Maven 命令构建这个项目:

    $ mvn clean package

    产生的jar文件会在target的子文件夹中: target/wiki-edits-0.1.jar。我们接下来会用到这个。现在我们准备安装一个Flink集群,并在其上运行写入到Kafka的程序。到你安装的Flink目录下,开启一个本地的集群:

    $ cd my/flink/directory $ bin/start-local.sh

    我们也需要创建这个Kafka Topic,以便我们的程序能写入:

    $ cd my/kafka/directory $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results

    现在我们准备在本地的Flink集群上运行我们的jar文件:

    $ cd my/flink/directory $ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

    如果一切按照计划执行,命令行输出会跟下面的相似:

    03/08/2016 15:09:27 Job execution switched to status RUNNING. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

    你可以看到各个操作是如何开始运行的。只有两个操作是因为由于性能原因窗口后面的操作折叠成了一个。在Flink中,我们称这个为chaining。

    你可以用Kafka 控制台消费者通过检测Kafka主题来观察程序的输出:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result

    你可以查看运行在 http://localhost:8081上面的Flink仪表盘。你可以对你的集群资源和运行的任务有个整体的感知:

    如果你点击运行的任务,你会看到一个可以观察单个操作的视图,例如,看到执行的元素的数量:

    结束了我们的Flink之旅,如果你有如何问题,请不要犹豫在我们的Mailing Lists提问。

    转载自 并发编程网 - ifeve.com

    最新回复(0)