[
https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872200#comment-17872200
]
Laxman edited comment on KAFKA-17299 at 8/9/24 7:00 AM:
--------------------------------------------------------
{quote}Have you been able to determine the problem in the record accounting?
{quote}
Before we patched, issue was reproducible consistently in our cluster. After
the patch, application is able to make progress.
However, I couldn't locate the accounting problem in kafka code.
I'm new to contributing to kafka codebase. So, can you please point me to the
existing unit/integration tests for this/similar scenarios.
was (Author: lakshman):
{quote}Have you been able to determine the problem in the record accounting?
{quote}
Before we patched with , issue was reproducible consistently in our cluster.
After the patch, application is able to make progress.
However, I couldn't locate the accounting problem in kafka code. I'm new to
contributing to kafka codebase.
So, can you please point me to the existing unit/integration tests for
this/similar scenarios.
> Kafka streams consumer stops consumption
> ----------------------------------------
>
> Key: KAFKA-17299
> URL: https://issues.apache.org/jira/browse/KAFKA-17299
> Project: Kafka
> Issue Type: Bug
> Components: consumer, streams
> Affects Versions: 3.5.2, 3.6.2, 3.8.0, 3.7.1
> Reporter: Laxman
> Priority: Major
>
> We are using kafka clients version (3.5.2). However, the bug looks to exist
> in current version as well from our code review.
> In one of our clusters, kafka streams consumption completely stops and
> doesn't recover even after restart of the consumer instance/pod. These are
> our findings/observations from our debugging.
> * We have some deserialisation errors while the streams consuming.
> * We configured LogAndContinueExceptionHandler as exception handler to
> handle deserialisation failures.
> * Streams consumption doesn't stop on every deserialisation failure/error.
> * We are noticing the consumption hangs, only when the first message in the
> current batch is faulty and fails to deserialise.
> We did a thorough inspection of the kafka clients code and debugged by
> patching with additional logs, we found the following lines of code from
> StreamTask.java seems to be the issue.
> *Original* - Original code
> [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
> {code:java}
> // after processing this record, if its partition queue's
> buffered size has been
> // decreased to the threshold, we can then resume the consumption
> on this partition
> if (recordInfo.queue().size() == maxBufferedSize) {
> mainConsumer.resume(singleton(partition));
> }
> {code}
>
> *Patched* - Issue resolved after this fix for us.
> {code:java}
> // after processing this record, if its partition queue's
> buffered size has been
> // decreased to the threshold, we can then resume the consumption
> on this partition
> if (recordInfo.queue().size() <= maxBufferedSize) {
> mainConsumer.resume(singleton(partition));
> }
> {code}
>
> We are resuming consumption only when queue size is exactly matching max
> buffered size. I think some record accounting has gone wrong especially when
> there is an issue with deserialising the first record in the batch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)