[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377644#comment-17377644
 ] 

Colin McCabe edited comment on KAFKA-13008 at 7/8/21, 11:41 PM:
----------------------------------------------------------------

bq. Surely if a partition was removed from the assignment and then added back, 
this should constitute a new 'session', and thus it should get the metadata 
again on assignment

No, sessions may be changed without creating a new session. They wouldn't be 
much use otherwise, since many consumers often change their subscriptions or 
mute partitions, etc.

bq. If so, then maybe we should consider allowing metadata to remain around 
after a partition is unassigned, in case it gets this same partition back 
within the session? Could there be other consequences of this lack of metadata, 
outside of Streams?

The issue is that the metadata would be incorrect. If the fetch session doesn't 
contain a partition, we really can't say anything about what its lag is, other 
than potentially caching a stale value. But we don't know how long the 
partition could be muted (it could be hours, for example).

Ultimately there is a tradeoff here between having the most up-to-date lag 
information for each partition, and efficiency. I'm not totally sure what the 
best way to resolve this is (we'd have to look at the Streams use-case more 
carefully).


was (Author: cmccabe):
> Surely if a partition was removed from the assignment and then added back, 
> this should constitute a new 'session', and thus it should get the metadata 
> again on assignment

No, sessions may be changed without creating a new session. They wouldn't be 
much use otherwise, since many consumers often change their subscriptions or 
mute partitions, etc.

> If so, then maybe we should consider allowing metadata to remain around after 
> a partition is unassigned, in case it gets this same partition back within 
> the session? Could there be other consequences of this lack of metadata, 
> outside of Streams?

The issue is that the metadata would be incorrect. If the fetch session doesn't 
contain a partition, we really can't say anything about what its lag is, other 
than potentially caching a stale value. But we don't know how long the 
partition could be muted (it could be hours, for example).

Ultimately there is a tradeoff here between having the most up-to-date lag 
information for each partition, and efficiency. I'm not totally sure what the 
best way to resolve this is (we'd have to look at the Streams use-case more 
carefully).

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13008
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13008
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.0.0
>            Reporter: Luke Chen
>            Priority: Major
>         Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to