Hi Eleanore,

Interesting topic, thank you for more information. I don’t see that this is 
unexpected behavior for KafkaIO since, as Heejong said before,  it relies on 
implementation of KafkaConsumer that is used in your case.

According to KafkaConsumer Javadoc [1], in most cases it should read fairly 
from different partitions in case if one consumer handles several of them: 

“Consumption Flow Control

If a consumer is assigned multiple partitions to fetch data from, it will try 
to consume from all of them at the same time, effectively giving these 
partitions the same priority for consumption. However in some cases consumers 
may want to first focus on fetching from some subset of the assigned partitions 
at full speed, and only start fetching other partitions when these partitions 
have few or no data to consume.”

Perhaps, you may want to try to change fetch.max.bytes or 
max.partition.fetch.bytes options and see if it will help.


[1] 
http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
 
<http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html>




> On 12 May 2020, at 07:52, Eleanore Jin <[email protected]> wrote:
> 
> Hi Chamikara and Lee, 
> 
> Thanks for the information, I did more experiment on my local laptop. (Flink 
> Runner local mode, Job Manager and Task Manager runs in the same JVM)
> setup: input topic 4 partitions
> 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0 
> lags, then move to the another partition
> 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and move 
> to the rest of the partitions
> 3. with 4 parallelism: KafkaIO read will read 4 partitions together.
> 
> In production, we run multiple Flink Task managers, from the consumer lag 
> reported, we also see some partitions goes to 0, while other partitions 
> remain high lag. 
> 
> Thanks!
> Eleanore
> 
> On Mon, May 11, 2020 at 8:19 PM Heejong Lee <[email protected] 
> <mailto:[email protected]>> wrote:
> If we assume that there's only one reader, all partitions are assigned to a 
> single KafkaConsumer. I think the order of reading each partition depends on 
> KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns messages.
> 
> Reference:
> assigning partitions: 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83>
> polling records: 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538>
> creating a record batch: 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614>
> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <[email protected] 
> <mailto:[email protected]>> wrote:
> The number of partitions assigned to a given split depends on the 
> desiredNumSplits value provided by the runner.
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54>
> 
> (This is assuming that you are using Beam Kafka source not a native Flink 
> override).
> 
> Do you see the same behavior when you increase the number of workers of your 
> Flink cluster ?
> 
> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi community, 
> 
> In my pipeline, I am using KafkaIO to read and write. The source topic has 4 
> partitions and pipeline parallelism is 1. 
> 
> I noticed from consumer lag metrics, it will consume from 1 partition until 
> all the messages from that partition is processed then it will consume from 
> another partition. 
> 
> Is this the expected behavior? 
> 
> Runner is Flink. 
> 
> Thanks a lot! 
> Eleanore 

Reply via email to