Hi Eleanore, Interesting topic, thank you for more information. I don’t see that this is unexpected behavior for KafkaIO since, as Heejong said before, it relies on implementation of KafkaConsumer that is used in your case.
According to KafkaConsumer Javadoc [1], in most cases it should read fairly from different partitions in case if one consumer handles several of them: “Consumption Flow Control If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.” Perhaps, you may want to try to change fetch.max.bytes or max.partition.fetch.bytes options and see if it will help. [1] http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html <http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html> > On 12 May 2020, at 07:52, Eleanore Jin <[email protected]> wrote: > > Hi Chamikara and Lee, > > Thanks for the information, I did more experiment on my local laptop. (Flink > Runner local mode, Job Manager and Task Manager runs in the same JVM) > setup: input topic 4 partitions > 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0 > lags, then move to the another partition > 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and move > to the rest of the partitions > 3. with 4 parallelism: KafkaIO read will read 4 partitions together. > > In production, we run multiple Flink Task managers, from the consumer lag > reported, we also see some partitions goes to 0, while other partitions > remain high lag. > > Thanks! > Eleanore > > On Mon, May 11, 2020 at 8:19 PM Heejong Lee <[email protected] > <mailto:[email protected]>> wrote: > If we assume that there's only one reader, all partitions are assigned to a > single KafkaConsumer. I think the order of reading each partition depends on > KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns messages. > > Reference: > assigning partitions: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83 > > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83> > polling records: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538 > > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538> > creating a record batch: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614 > > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614> > On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <[email protected] > <mailto:[email protected]>> wrote: > The number of partitions assigned to a given split depends on the > desiredNumSplits value provided by the runner. > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 > > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54> > > (This is assuming that you are using Beam Kafka source not a native Flink > override). > > Do you see the same behavior when you increase the number of workers of your > Flink cluster ? > > On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <[email protected] > <mailto:[email protected]>> wrote: > Hi community, > > In my pipeline, I am using KafkaIO to read and write. The source topic has 4 > partitions and pipeline parallelism is 1. > > I noticed from consumer lag metrics, it will consume from 1 partition until > all the messages from that partition is processed then it will consume from > another partition. > > Is this the expected behavior? > > Runner is Flink. > > Thanks a lot! > Eleanore
