[ https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875700#comment-17875700 ]
Matthias J. Sax commented on KAFKA-17299: ----------------------------------------- {quote}We are noticing the consumption hangs, only when the first message in the current batch is faulty and fails to deserialise. {quote} Looking into the code, this bug might be there for a long time, and it's actually a weird race condition – only if we did exceed the input partition buffer size, and if have a corrupted record at the exact point when we go below the buffer limit (and need to resume) we might skip resuming the partition accidentally. Of course, if there is longer "burst" of failing records, the probability to hit this bug is larger. It should not be too hard to write a unit test for this... (ie, inside StreamTaskTest.java). I am not sure though, if we should change the condition to `<=` as it would mean we call `resume()` for basically every input record... I believe we did use `==` to avoid unnecessary overhead as we are on the hot code path. [~lianetm] do you know what the overhead of calling `resume()` is – if it's cheap it might be ok (I also don't want to complicate the code with pre-mature optimizations...) > Kafka streams consumer stops consumption > ---------------------------------------- > > Key: KAFKA-17299 > URL: https://issues.apache.org/jira/browse/KAFKA-17299 > Project: Kafka > Issue Type: Bug > Components: consumer, streams > Affects Versions: 3.5.2, 3.6.2, 3.8.0, 3.7.1 > Reporter: Laxman > Priority: Major > > We are using kafka clients version (3.5.2). However, the bug looks to exist > in current version as well from our code review. > In one of our clusters, kafka streams consumption completely stops and > doesn't recover even after restart of the consumer instance/pod. These are > our findings/observations from our debugging. > * We have some deserialisation errors while the streams consuming. > * We configured LogAndContinueExceptionHandler as exception handler to > handle deserialisation failures. > * Streams consumption doesn't stop on every deserialisation failure/error. > * We are noticing the consumption hangs, only when the first message in the > current batch is faulty and fails to deserialise. > We did a thorough inspection of the kafka clients code and debugged by > patching with additional logs, we found the following lines of code from > StreamTask.java seems to be the issue. > *Original* - Original code > [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750] > {code:java} > // after processing this record, if its partition queue's > buffered size has been > // decreased to the threshold, we can then resume the consumption > on this partition > if (recordInfo.queue().size() == maxBufferedSize) { > mainConsumer.resume(singleton(partition)); > } > {code} > > *Patched* - Issue resolved after this fix for us. > {code:java} > // after processing this record, if its partition queue's > buffered size has been > // decreased to the threshold, we can then resume the consumption > on this partition > if (recordInfo.queue().size() <= maxBufferedSize) { > mainConsumer.resume(singleton(partition)); > } > {code} > > We are resuming consumption only when queue size is exactly matching max > buffered size. I think some record accounting has gone wrong especially when > there is an issue with deserialising the first record in the batch. -- This message was sent by Atlassian Jira (v8.20.10#820010)