Hi Alex,

Thanks a lot for the suggestion, it seems that with my previous experiment,
I did not pre-ingest enough amount of messages. So it looks like each
partition gets a slice of time to be consumed by the same consumer. And
maybe during partition1's time slice, it already drill down to zero, and
hence the observation.

I tried to ingest more data, and I see all of the partitions are making
progress. I will update if I have more findings.

Thanks a lot!
Eleanore

On Tue, May 12, 2020 at 10:44 AM Alexey Romanenko <[email protected]>
wrote:

> Hi Eleanore,
>
> Interesting topic, thank you for more information. I don’t see that this
> is unexpected behavior for KafkaIO since, as Heejong said before,  it
> relies on implementation of KafkaConsumer that is used in your case.
>
> According to KafkaConsumer Javadoc [1], in most cases it should read
> fairly from different partitions in case if one consumer handles several of
> them:
>
> “Consumption Flow Control
>
> If a consumer is assigned multiple partitions to fetch data from, it will
> try to consume from all of them at the same time, effectively giving these
> partitions the same priority for consumption. However in some cases
> consumers may want to first focus on fetching from some subset of the
> assigned partitions at full speed, and only start fetching other partitions
> when these partitions have few or no data to consume.”
>
> Perhaps, you may want to try to change
> fetch.max.bytes or max.partition.fetch.bytes options and see if it will
> help.
>
>
> [1]
> http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
>
>
>
> On 12 May 2020, at 07:52, Eleanore Jin <[email protected]> wrote:
>
> Hi Chamikara and Lee,
>
> Thanks for the information, I did more experiment on my local laptop.
> (Flink Runner local mode, Job Manager and Task Manager runs in the same JVM)
> setup: input topic 4 partitions
> 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0
> lags, then move to the another partition
> 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and
> move to the rest of the partitions
> 3. with 4 parallelism: KafkaIO read will read 4 partitions together.
>
> In production, we run multiple Flink Task managers, from the consumer lag
> reported, we also see some partitions goes to 0, while other
> partitions remain high lag.
>
> Thanks!
> Eleanore
>
> On Mon, May 11, 2020 at 8:19 PM Heejong Lee <[email protected]> wrote:
>
>> If we assume that there's only one reader, all partitions are assigned to
>> a single KafkaConsumer. I think the order of reading each partition depends
>> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
>> messages.
>>
>> Reference:
>> assigning partitions:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
>> polling records:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
>> creating a record batch:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>>
>> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> The number of partitions assigned to a given split depends on the
>>> desiredNumSplits value provided by the runner.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>>
>>> (This is assuming that you are using Beam Kafka source not a native
>>> Flink override).
>>>
>>> Do you see the same behavior when you increase the number of workers of
>>> your Flink cluster ?
>>>
>>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <[email protected]>
>>> wrote:
>>>
>>>> Hi community,
>>>>
>>>> In my pipeline, I am using KafkaIO to read and write. The source topic
>>>> has 4 partitions and pipeline parallelism is 1.
>>>>
>>>> I noticed from consumer lag metrics, it will consume from 1 partition
>>>> until all the messages from that partition is processed then it will
>>>> consume from another partition.
>>>>
>>>> Is this the expected behavior?
>>>>
>>>> Runner is Flink.
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>
>

Reply via email to