[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379505#comment-17379505 ] A. Sophie Blee-Goldman commented on KAFKA-13008: Thanks Guozhang, +1 on that approach (though I'll let Colin confirm whether #1 does make sense or not). We'll definitely need a Streams/client side fix if the 'real' fix is going to be on the broker side. My only question is whether this is something that might trip up other plain consumer client users in addition to Streams, and if so, whether there's anything we could do in the consumer client itself. But AFAIK it's only Streams that really relies on this metadata in this critical way, so I'm happy with the Streams-side fix as well > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379482#comment-17379482 ] Guozhang Wang commented on KAFKA-13008: --- Thanks for the great find [~showuon]! I took a look at the server-side code, and I think we can consider two things: 1) slightly augment the session handling logic so that within a session, if a partition was newly requested (here, we would not try to distinguish whether it was requested for the very first time, or it was re-added after a while), even if the requested position has reached the log end i.e. there's no data to return, we still return in the response to encode the log end information. WDYT [~cmccabe]? 2) since 1) would be a broker change and even if we do that, it may not help all cases for streams, we would still need some remedies. One (somewhat hacky..) idea is to actually pay the round-trip in such cases (only when the config is set to >= 0), but that since fetch request would not do for old versioned brokers, we would use the offset request (either consumer or admin has the API to do that) to get the log end offset. In fact, in Streams when we get the assigned partitions we always need to get the log end offset for changes at first to check if any restoration is needed, we can, just add source topic partitions as well in that phase and expose as the initial values for the main consumer as well. Note this is only done once after every rebalance, and no more. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379462#comment-17379462 ] A. Sophie Blee-Goldman commented on KAFKA-13008: {quote}Re-reading KIP-227, it seems like there should be a way for the client to add re-acquired partitions like this to the incremental fetch request so that it can reinitialize its metadata cache. In other words, it seems like getting a partition assigned that you haven't owned for a while is effectively the same case as getting a partition that you've never owned, and there does seem to be a mechanism for the latter. {quote} Thanks John, that is exactly what I was trying to suggest above, but I may have mungled it with my lack of understanding of the incremental fetch design. Given how long this bug went unnoticed and the in-depth investigation it took to uncover the bug (again, nicely done [~showuon]), it seems like any user of the plain consumer client in addition to Streams could be easily tripped up by this. And just personally, I had to read the analysis twice to really understand what was going on, since the behavior was/is so unintuitive to me. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag,
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17378338#comment-17378338 ] John Roesler commented on KAFKA-13008: -- Woah, this is an excellent find, [~showuon] ! You and Sophie are absolutely right. We chose to use the cached lag so that we wouldn't have to wait for a full round-trip every time we want to know whether we're approximately caught up. Colin is right, too; we don't want to use a metadata cache that can be arbitrarily old. There are plenty of applications that go days without a rebalance, which would certainly violate the semantics we're going for here. Re-reading KIP-227, it seems like there should be a way for the client to add re-acquired partitions like this to the incremental fetch request so that it can reinitialize its metadata cache. In other words, it seems like getting a partition assigned that you haven't owned for a while is effectively the same case as getting a partition that you've never owned, and there does seem to be a mechanism for the latter. Does that sound right to you, [~cmccabe] ? > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377644#comment-17377644 ] Colin McCabe commented on KAFKA-13008: -- > Surely if a partition was removed from the assignment and then added back, > this should constitute a new 'session', and thus it should get the metadata > again on assignment No, sessions may be changed without creating a new session. They wouldn't be much use otherwise, since many consumers often change their subscriptions or mute partitions, etc. > If so, then maybe we should consider allowing metadata to remain around after > a partition is unassigned, in case it gets this same partition back within > the session? Could there be other consequences of this lack of metadata, > outside of Streams? The issue is that the metadata would be incorrect. If the fetch session doesn't contain a partition, we really can't say anything about what its lag is, other than potentially caching a stale value. But we don't know how long the partition could be muted (it could be hours, for example). Ultimately there is a tradeoff here between having the most up-to-date lag information for each partition, and efficiency. I'm not totally sure what the best way to resolve this is (we'd have to look at the Streams use-case more carefully). > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376189#comment-17376189 ] Luke Chen commented on KAFKA-13008: --- Actually, I'm not very familiar with detailed incremental session, too. But from the , [KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]: we can see the old session will be evict only when matching 1 of the following 3 condition: !image-2021-07-07-11-19-55-630.png|width=762,height=161! And because in the step 5, all above 3 conditions won't match, the new session won't evict the old session. Also, during that time, the old session already contain the "up-to-date" partition info of the partition A-1, so no partition A-1 update will be received. This is my understanding. Please correct me if I'm wrong. Thank you. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376019#comment-17376019 ] A. Sophie Blee-Goldman commented on KAFKA-13008: Nice find! I agree, this does not seem like the expected behavior, and given that it's been causing a test to fail regularly I think we can assert that this should not happen. One thing I don't understand, and maybe this is because I don't have much context on the incremental fetch internals, is: why would we not get the metadata again after the partition is re-assigned in step 5? Surely if a partition was removed from the assignment and then added back, this should constitute a new 'session', and thus it should get the metadata again on assignment? If not that sounds like a bug in the incremental fetch to me, but again, I'm not too familiar with it so there could be a valid reason it works this way. If so, then maybe we should consider allowing metadata to remain around after a partition is unassigned, in case it gets this same partition back within the session? Could there be other consequences of this lack of metadata, outside of Streams? > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371930#comment-17371930 ] Luke Chen commented on KAFKA-13008: --- [~vvcephei] [~guozhang] , any suggestions for this issue? IMO, this is not the expected behavior because what we expected to wait is the partition info not retrieved, yet. But in this case, the partition info is indeed retrieved (in fetch session's perspective), but the partition cache for that partition is cleared. Thank you. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)