[ https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899909#comment-17899909 ]
Matthias J. Sax edited comment on KAFKA-17299 at 11/21/24 1:20 AM: ------------------------------------------------------------------- Btw: I would expect that `StreamTaskTest#shouldPauseAndResumeBasedOnBufferedRecords` need to be updates – I expect you patch to break to test, due to an "off by one error". Please include an update to this test, too, to reflect the new "resuming" condition. was (Author: mjsax): Btw: I would expect that `StreamTaskTest#shouldPauseAndResumeBasedOnBufferedRecords` need to be updates – I expect you patch to break to test, due to an "off by one error". Please include an update to this test, too, to reflect the new "pausing" condition. > Kafka Streams consumer stops consumption > ---------------------------------------- > > Key: KAFKA-17299 > URL: https://issues.apache.org/jira/browse/KAFKA-17299 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.2, 3.6.2, 3.8.0, 3.7.1 > Reporter: Laxman > Assignee: Laxman > Priority: Major > Attachments: KAFKA-17299.patch > > > We are using kafka clients version (3.5.2). However, the bug looks to exist > in current version as well from our code review. > In one of our clusters, kafka streams consumption completely stops and > doesn't recover even after restart of the consumer instance/pod. These are > our findings/observations from our debugging. > * We have some deserialisation errors while the streams consuming. > * We configured LogAndContinueExceptionHandler as exception handler to > handle deserialisation failures. > * Streams consumption doesn't stop on every deserialisation failure/error. > * We are noticing the consumption hangs, only when the first message in the > current batch is faulty and fails to deserialise. > We did a thorough inspection of the kafka clients code and debugged by > patching with additional logs, we found the following lines of code from > StreamTask.java seems to be the issue. > *Original* - Original code > [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750] > {code:java} > // after processing this record, if its partition queue's > buffered size has been > // decreased to the threshold, we can then resume the consumption > on this partition > if (recordInfo.queue().size() == maxBufferedSize) { > mainConsumer.resume(singleton(partition)); > } > {code} > > *Patched* - Issue resolved after this fix for us. > {code:java} > // after processing this record, if its partition queue's > buffered size has been > // decreased to the threshold, we can then resume the consumption > on this partition > if (recordInfo.queue().size() <= maxBufferedSize) { > mainConsumer.resume(singleton(partition)); > } > {code} > > We are resuming consumption only when queue size is exactly matching max > buffered size. I think some record accounting has gone wrong especially when > there is an issue with deserialising the first record in the batch. -- This message was sent by Atlassian Jira (v8.20.10#820010)