FlinkConsumer是如何保证一个partition对应一个thread的

    xiaoxiao2025-02-03  50

    我们都知道flink 连接kafka时,默认是一个partition对应一个thread,它究竟是怎么实现的呢?以及到我们自己定义 RichParallelSourceFunction 的时候如何借鉴这部分代码呢? 我们一起来看一下(基于flink-1.8) 看过flink kafka连接器源码的同学对 FlinkKafkaConsumerBase 应该不陌生(没有看过的也无所谓,我们一起来看就好) 一起来看一下 FlinkKafkaConsumerBase 的 open 方法中关键的部分

    //获取fixed topic's or topic pattern 's partitions of this subtask final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

    没错这就是查看Flink Consumer 保证 一个partition对应一个Thread的入口方法

    public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException { if (!closed && !wakeup) { try { ... // (2) eliminate partition that are old partitions or should not be subscribed by this subtask if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor); } else { Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator(); KafkaTopicPartition nextPartition; while (iter.hasNext()) { nextPartition = iter.next(); //从之前已经发现的KafkaTopicPartition中移除,其二可以保证仅仅是这个subtask的partition if (!setAndCheckDiscoveredPartition(nextPartition)) { iter.remove(); } } } return newDiscoveredPartitions; ... }

    关键性的部分 setAndCheckDiscoveredPartition 方法,点进去

    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) { if (isUndiscoveredPartition(partition)) { discoveredPartitions.add(partition); //kafkaPartition与indexOfThisSubTask --对应 return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask; } return false; }

    indexOfThisSubtask 表示当前线程是那个subtask,numParallelSubtasks 表示总共并行的subtask 的个数, 当其返回true的时候,表示此partition 属于此indexOfThisSubtask。 下面来看一下具体是怎么划分的

    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the start index return (startIndex + partition.getPartition()) % numParallelSubtasks; }

    基于topic 和 partition,然后对numParallelSubtasks取余。

    那么,当我们自己去定义RichParallelSourceFunction的时候如何去借鉴它呢,直接上代码:

    public class WordSource extends RichParallelSourceFunction<Tuple2<Long, Long>> { private Boolean isRun = true; @Override public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { int start = 0; int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); while (isRun) { start += 1; if (start % numberOfParallelSubtasks == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(new Tuple2<>( Long.parseLong(start+""), 1L)); Thread.sleep(1000); System.out.println("Thread.currentThread().getName()=========== " + Thread.currentThread().getName()); } } } @Override public void cancel() { isRun = false; } }

    当当当,自此,自己定义个RichParallelSourceFunction也可以并行发数据了,啦啦啦啦!

    最新回复(0)