If we assume that there's only one reader, all partitions are assigned to a
single KafkaConsumer. I think the order of reading each partition depends
on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
messages.

Reference:
assigning partitions:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
polling records:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
creating a record batch:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614

On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <[email protected]>
wrote:

> The number of partitions assigned to a given split depends on the
> desiredNumSplits value provided by the runner.
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>
> (This is assuming that you are using Beam Kafka source not a native Flink
> override).
>
> Do you see the same behavior when you increase the number of workers of
> your Flink cluster ?
>
> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <[email protected]>
> wrote:
>
>> Hi community,
>>
>> In my pipeline, I am using KafkaIO to read and write. The source topic
>> has 4 partitions and pipeline parallelism is 1.
>>
>> I noticed from consumer lag metrics, it will consume from 1 partition
>> until all the messages from that partition is processed then it will
>> consume from another partition.
>>
>> Is this the expected behavior?
>>
>> Runner is Flink.
>>
>> Thanks a lot!
>> Eleanore
>>
>

Reply via email to