[
https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899362#comment-17899362
]
Laxman commented on KAFKA-17299:
--------------------------------
[~mjsax], [~lianetm] : Please review and merge this. Will work on this if you
have any comments.
> Kafka Streams consumer stops consumption
> ----------------------------------------
>
> Key: KAFKA-17299
> URL: https://issues.apache.org/jira/browse/KAFKA-17299
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.5.2, 3.6.2, 3.8.0, 3.7.1
> Reporter: Laxman
> Priority: Major
> Attachments: KAFKA-17299.patch
>
>
> We are using kafka clients version (3.5.2). However, the bug looks to exist
> in current version as well from our code review.
> In one of our clusters, kafka streams consumption completely stops and
> doesn't recover even after restart of the consumer instance/pod. These are
> our findings/observations from our debugging.
> * We have some deserialisation errors while the streams consuming.
> * We configured LogAndContinueExceptionHandler as exception handler to
> handle deserialisation failures.
> * Streams consumption doesn't stop on every deserialisation failure/error.
> * We are noticing the consumption hangs, only when the first message in the
> current batch is faulty and fails to deserialise.
> We did a thorough inspection of the kafka clients code and debugged by
> patching with additional logs, we found the following lines of code from
> StreamTask.java seems to be the issue.
> *Original* - Original code
> [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
> {code:java}
> // after processing this record, if its partition queue's
> buffered size has been
> // decreased to the threshold, we can then resume the consumption
> on this partition
> if (recordInfo.queue().size() == maxBufferedSize) {
> mainConsumer.resume(singleton(partition));
> }
> {code}
>
> *Patched* - Issue resolved after this fix for us.
> {code:java}
> // after processing this record, if its partition queue's
> buffered size has been
> // decreased to the threshold, we can then resume the consumption
> on this partition
> if (recordInfo.queue().size() <= maxBufferedSize) {
> mainConsumer.resume(singleton(partition));
> }
> {code}
>
> We are resuming consumption only when queue size is exactly matching max
> buffered size. I think some record accounting has gone wrong especially when
> there is an issue with deserialising the first record in the batch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)