[ 
https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872200#comment-17872200
 ] 

Laxman edited comment on KAFKA-17299 at 8/9/24 7:00 AM:
--------------------------------------------------------

{quote}Have you been able to determine the problem in the record accounting?
{quote}
Before we patched, issue was reproducible consistently in our cluster. After 
the patch, application is able to make progress.

However, I couldn't locate the accounting problem in kafka code.

I'm new to contributing to kafka codebase. So, can you please point me to the 
existing unit/integration tests for this/similar scenarios.

 


was (Author: lakshman):
{quote}Have you been able to determine the problem in the record accounting?
{quote}
Before we patched with , issue was reproducible consistently in our cluster. 
After the patch, application is able to make progress.

However, I couldn't locate the accounting problem in kafka code. I'm new to 
contributing to kafka codebase.

So, can you please point me to the existing unit/integration tests for 
this/similar scenarios.

 

> Kafka streams consumer stops consumption
> ----------------------------------------
>
>                 Key: KAFKA-17299
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17299
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, streams
>    Affects Versions: 3.5.2, 3.6.2, 3.8.0, 3.7.1
>            Reporter: Laxman
>            Priority: Major
>
> We are using kafka clients version (3.5.2). However, the bug looks to exist 
> in current version as well from our code review. 
> In one of our clusters, kafka streams consumption completely stops and 
> doesn't recover even after restart of the consumer instance/pod. These are 
> our findings/observations from our debugging.
>  * We have some deserialisation errors while the streams consuming. 
>  * We configured LogAndContinueExceptionHandler as exception handler to 
> handle deserialisation failures.
>  * Streams consumption doesn't stop on every deserialisation failure/error. 
>  * We are noticing the consumption hangs, only when the first message in the 
> current batch is faulty and fails to deserialise.
> We did a thorough inspection of the kafka clients code and debugged by 
> patching with additional logs, we found the following lines of code from 
> StreamTask.java seems to be the issue.
> *Original* - Original code 
> [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
> {code:java}
>             // after processing this record, if its partition queue's 
> buffered size has been
>             // decreased to the threshold, we can then resume the consumption 
> on this partition
>             if (recordInfo.queue().size() == maxBufferedSize) {
>                 mainConsumer.resume(singleton(partition));
>             }
> {code}
>  
> *Patched* - Issue resolved after this fix for us.
> {code:java}
>             // after processing this record, if its partition queue's 
> buffered size has been
>             // decreased to the threshold, we can then resume the consumption 
> on this partition
>             if (recordInfo.queue().size() <= maxBufferedSize) {
>                 mainConsumer.resume(singleton(partition));
>             }
> {code}
>  
> We are resuming consumption only when queue size is exactly matching max 
> buffered size. I think some record accounting has gone wrong especially when 
> there is an issue with deserialising the first record in the batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to