[ 
https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899909#comment-17899909
 ] 

Matthias J. Sax edited comment on KAFKA-17299 at 11/21/24 1:20 AM:
-------------------------------------------------------------------

Btw: I would expect that 
`StreamTaskTest#shouldPauseAndResumeBasedOnBufferedRecords` need to be updates 
– I expect you patch to break to test, due to an "off by one error". Please 
include an update to this test, too, to reflect the new "resuming" condition.


was (Author: mjsax):
Btw: I would expect that 
`StreamTaskTest#shouldPauseAndResumeBasedOnBufferedRecords` need to be updates 
– I expect you patch to break to test, due to an "off by one error". Please 
include an update to this test, too, to reflect the new "pausing" condition.

> Kafka Streams consumer stops consumption
> ----------------------------------------
>
>                 Key: KAFKA-17299
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17299
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.2, 3.6.2, 3.8.0, 3.7.1
>            Reporter: Laxman
>            Assignee: Laxman
>            Priority: Major
>         Attachments: KAFKA-17299.patch
>
>
> We are using kafka clients version (3.5.2). However, the bug looks to exist 
> in current version as well from our code review. 
> In one of our clusters, kafka streams consumption completely stops and 
> doesn't recover even after restart of the consumer instance/pod. These are 
> our findings/observations from our debugging.
>  * We have some deserialisation errors while the streams consuming. 
>  * We configured LogAndContinueExceptionHandler as exception handler to 
> handle deserialisation failures.
>  * Streams consumption doesn't stop on every deserialisation failure/error. 
>  * We are noticing the consumption hangs, only when the first message in the 
> current batch is faulty and fails to deserialise.
> We did a thorough inspection of the kafka clients code and debugged by 
> patching with additional logs, we found the following lines of code from 
> StreamTask.java seems to be the issue.
> *Original* - Original code 
> [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
> {code:java}
>             // after processing this record, if its partition queue's 
> buffered size has been
>             // decreased to the threshold, we can then resume the consumption 
> on this partition
>             if (recordInfo.queue().size() == maxBufferedSize) {
>                 mainConsumer.resume(singleton(partition));
>             }
> {code}
>  
> *Patched* - Issue resolved after this fix for us.
> {code:java}
>             // after processing this record, if its partition queue's 
> buffered size has been
>             // decreased to the threshold, we can then resume the consumption 
> on this partition
>             if (recordInfo.queue().size() <= maxBufferedSize) {
>                 mainConsumer.resume(singleton(partition));
>             }
> {code}
>  
> We are resuming consumption only when queue size is exactly matching max 
> buffered size. I think some record accounting has gone wrong especially when 
> there is an issue with deserialising the first record in the batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to