《Flink官方文档》Batch Examples(一)

    xiaoxiao2024-01-13  158

    批处理示例

    下面的程序展示了从简单的单词词频统计到图算法等不同的Flink应用。代码展示了Flink数据集API的使用方法。

    下面案例和更多案例的完整源码可以参见Flink源码中的flink-examples-batch和 flink-examples-streaming模块。

    运行实例

    为了运行Flink的例子,我们假设你拥有已经启动的Flink实例。在导航栏中的“Quickstart” 和 “Setup”介绍了启动Flink的几种不同方法。

    最简单的方法是运行脚本./bin/start-local.sh,执行后一个启动本地JobManager。

    每个编译好的Flink源码包含了一个实例目录,其中包括了此页面每个例子的jar包。

    执行如下命令,来运行WordCount例子

    ./bin/flink run ./examples/batch/WordCount.jar 其他的例子都可以用类似的方式执行

    如果运行例子的时候没有带参数,默认使用缺省参数。如果希望使用真实数据来运行WordCount,需要将数据的路径传递进去

    ./bin/flink run ./examples/batch/WordCount.jar –input /path/to/some/text/data –output /path/to/result 注意非本地文件系统需要标明数据库前缀,比如HDFS://

    词频统计

    单词词频统计是大数据处理系统“hello world”程序。它计算了文本中的词频。算法分成两步,第一部分,将文本分隔成不同单词。第二步,讲这些单词分组并计数。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("/path/to/file"); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); counts.writeAsCsv(outputPath, "\n", " "); // User-defined functions public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }

    词频统计例子实现了上述描述的算法,需要两个输入参数。–input –output 。测试数据可以替换为任何文本。

    val env = ExecutionEnvironment.getExecutionEnvironment // get input data val text = env.readTextFile("/path/to/file") val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) counts.writeAsCsv(outputPath, "\n", " ")

    Page Rank

    PageRank算法计算了图中页面的重要性,一个页面到另一页面的点形成了链接,这些链接定义成图。它是迭代式的算法,意味着相同的计算会被重复执行。在每次迭代中,每个页面对它的邻居贡献出相同的评分,并接受来自它的邻居的加权评分作为新的评分。PageRank算法因google搜索引擎众所周知,它被用来计算网页搜索查询结果的评分。

    这个例子中,PageRank通过一批迭代和固定次数的迭代来完成。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read the pages and initial ranks by parsing a CSV file DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath) .types(Long.class, Double.class) // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env); // set iterative data set IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations); DataSet<Tuple2<Long, Double>> newRanks = iteration // join pages with outgoing edges and distribute rank .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch()) // collect and sum ranks .groupBy(0).sum(1) // apply dampening factor .map(new Dampener(DAMPENING_FACTOR, numPages)); DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith( newRanks, newRanks.join(iteration).where(0).equalTo(0) // termination condition .filter(new EpsilonFilter())); finalPageRanks.writeAsCsv(outputPath, "\n", " "); // User-defined functions public static final class JoinVertexWithEdgesMatch implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>, Tuple2<Long, Double>> { @Override public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj, Collector<Tuple2<Long, Double>> out) { Long[] neighbors = adj.f1; double rank = page.f1; double rankToDistribute = rank / ((double) neigbors.length); for (int i = 0; i < neighbors.length; i++) { out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute)); } } } public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { private final double dampening, randomJump; public Dampener(double dampening, double numVertices) { this.dampening = dampening; this.randomJump = (1 - dampening) / numVertices; } @Override public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { value.f1 = (value.f1 * dampening) + randomJump; return value; } } public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { @Override public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; } }

    pagerank 程序实现了上面的例子。需要下面的运行参数–pages –links –output –numPages –iterations 。

    scala

    // User-defined types case class Link(sourceId: Long, targetId: Long) case class Page(pageId: Long, rank: Double) case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment // read the pages and initial ranks by parsing a CSV file val pages = env.readCsvFile[Page](pagesInputPath) // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) val links = env.readCsvFile[Link](linksInputPath) // assign initial ranks to pages val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)) // build adjacency list from link input val adjacencyLists = links // initialize lists .map(e => AdjacencyList(e.sourceId, Array(e.targetId))) // concatenate lists .groupBy("sourceId").reduce { (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds) } // start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { currentRanks => val newRanks = currentRanks // distribute ranks to target pages .join(adjacencyLists).where("pageId").equalTo("sourceId") { (page, adjacent, out: Collector[Page]) => for (targetId <- adjacent.targetIds) { out.collect(Page(targetId, page.rank / adjacent.targetIds.length)) } } // collect ranks and sum them up .groupBy("pageId").aggregate(SUM, "rank") // apply dampening factor .map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) } // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { (current, next, out: Collector[Int]) => // check for significant update if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) } (newRanks, termination) } val result = finalRanks // emit result result.writeAsCsv(outputPath, "\n", " ")

    输入文件必须是普通文本文件而且文件必须是遵循下列格式:

    –Pages 用long型的ID表示,并以换行符分隔,如”1\n2\n12\n42\n63\n”体现了5个页面,id分别是1, 2, 12, 42, and 63。

    –Links表示了多对pageId的组合,每对之间通过空格分隔,不同links用换行符分隔。”1 2\n2 12\n1 12\n42 63\n”表示了(1)->(2), (2)->(12), (1)->(12), and (42)->(63)四个有向链接。

    为了这个简单实现至少需要每个页面至少有一个入链接和一个出链接。一个页面可以链接到他自己。

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)