[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379505#comment-17379505
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13008:
------------------------------------------------

Thanks Guozhang, +1 on that approach (though I'll let Colin confirm whether #1 
does make sense or not). We'll definitely need a Streams/client side fix if the 
'real' fix is going to be on the broker side. My only question is whether this 
is something that might trip up other plain consumer client users in addition 
to Streams, and if so, whether there's anything we could do in the consumer 
client itself. But AFAIK it's only Streams that really relies on this metadata 
in this critical way, so I'm happy with the Streams-side fix as well

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13008
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13008
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.0.0
>            Reporter: Luke Chen
>            Priority: Blocker
>             Fix For: 3.0.0
>
>         Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to