Hi Alex, Thanks a lot for the suggestion, it seems that with my previous experiment, I did not pre-ingest enough amount of messages. So it looks like each partition gets a slice of time to be consumed by the same consumer. And maybe during partition1's time slice, it already drill down to zero, and hence the observation.
I tried to ingest more data, and I see all of the partitions are making progress. I will update if I have more findings. Thanks a lot! Eleanore On Tue, May 12, 2020 at 10:44 AM Alexey Romanenko <[email protected]> wrote: > Hi Eleanore, > > Interesting topic, thank you for more information. I don’t see that this > is unexpected behavior for KafkaIO since, as Heejong said before, it > relies on implementation of KafkaConsumer that is used in your case. > > According to KafkaConsumer Javadoc [1], in most cases it should read > fairly from different partitions in case if one consumer handles several of > them: > > “Consumption Flow Control > > If a consumer is assigned multiple partitions to fetch data from, it will > try to consume from all of them at the same time, effectively giving these > partitions the same priority for consumption. However in some cases > consumers may want to first focus on fetching from some subset of the > assigned partitions at full speed, and only start fetching other partitions > when these partitions have few or no data to consume.” > > Perhaps, you may want to try to change > fetch.max.bytes or max.partition.fetch.bytes options and see if it will > help. > > > [1] > http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > > On 12 May 2020, at 07:52, Eleanore Jin <[email protected]> wrote: > > Hi Chamikara and Lee, > > Thanks for the information, I did more experiment on my local laptop. > (Flink Runner local mode, Job Manager and Task Manager runs in the same JVM) > setup: input topic 4 partitions > 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0 > lags, then move to the another partition > 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and > move to the rest of the partitions > 3. with 4 parallelism: KafkaIO read will read 4 partitions together. > > In production, we run multiple Flink Task managers, from the consumer lag > reported, we also see some partitions goes to 0, while other > partitions remain high lag. > > Thanks! > Eleanore > > On Mon, May 11, 2020 at 8:19 PM Heejong Lee <[email protected]> wrote: > >> If we assume that there's only one reader, all partitions are assigned to >> a single KafkaConsumer. I think the order of reading each partition depends >> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns >> messages. >> >> Reference: >> assigning partitions: >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83 >> polling records: >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538 >> creating a record batch: >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614 >> >> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> The number of partitions assigned to a given split depends on the >>> desiredNumSplits value provided by the runner. >>> >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 >>> >>> (This is assuming that you are using Beam Kafka source not a native >>> Flink override). >>> >>> Do you see the same behavior when you increase the number of workers of >>> your Flink cluster ? >>> >>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <[email protected]> >>> wrote: >>> >>>> Hi community, >>>> >>>> In my pipeline, I am using KafkaIO to read and write. The source topic >>>> has 4 partitions and pipeline parallelism is 1. >>>> >>>> I noticed from consumer lag metrics, it will consume from 1 partition >>>> until all the messages from that partition is processed then it will >>>> consume from another partition. >>>> >>>> Is this the expected behavior? >>>> >>>> Runner is Flink. >>>> >>>> Thanks a lot! >>>> Eleanore >>>> >>> >
