If we assume that there's only one reader, all partitions are assigned to a single KafkaConsumer. I think the order of reading each partition depends on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns messages.
Reference: assigning partitions: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83 polling records: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538 creating a record batch: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614 On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <[email protected]> wrote: > The number of partitions assigned to a given split depends on the > desiredNumSplits value provided by the runner. > > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 > > (This is assuming that you are using Beam Kafka source not a native Flink > override). > > Do you see the same behavior when you increase the number of workers of > your Flink cluster ? > > On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <[email protected]> > wrote: > >> Hi community, >> >> In my pipeline, I am using KafkaIO to read and write. The source topic >> has 4 partitions and pipeline parallelism is 1. >> >> I noticed from consumer lag metrics, it will consume from 1 partition >> until all the messages from that partition is processed then it will >> consume from another partition. >> >> Is this the expected behavior? >> >> Runner is Flink. >> >> Thanks a lot! >> Eleanore >> >
