[
https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Laxman updated KAFKA-17299:
---------------------------
Description:
We are using kafka clients version (3.5.2). However, the bug looks to exist in
current version as well from our code review.
In one of our clusters, kafka streams consumption completely stops and doesn't
recover even after restart of the consumer instance/pod. These are our
findings/observations from our debugging.
* We have some deserialisation errors while the streams consuming.
* We configured LogAndContinueExceptionHandler as exception handler to handle
deserialisation failures.
* Streams consumption doesn't stop on every deserialisation failure/error.
* We are noticing the consumption hangs, only when the first message in the
current batch is faulty and fails to deserialise.
We did a thorough inspection of the kafka clients code and debugged by patching
with additional logs, we found the following lines of code from StreamTask.java
seems to be the issue.
*Original* - Original code
[StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
{code:java}
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
if (recordInfo.queue().size() == maxBufferedSize) {
mainConsumer.resume(singleton(partition));
}
{code}
*Patched* - Issue resolved after this fix for us.
{code:java}
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
if (recordInfo.queue().size() <= maxBufferedSize) {
mainConsumer.resume(singleton(partition));
}
{code}
We are resuming consumption only when queue size is exactly matching max
buffered size. I think some record accounting has gone wrong especially when
there is an issue with deserialising the first record in the batch.
was:
{code:java}
// code placeholder
{code}
We are using kafka clients version (3.5.2). However, the bug looks to exist in
current version as well from our code review.
In one of our clusters, kafka streams consumption completely stops and doesn't
recover even after restart of the consumer instance/pod. These are our
findings/observations from our debugging.
* We have some deserialisation errors while the streams consuming.
* We configured LogAndContinueExceptionHandler as exception handler to handle
deserialisation failures.
* Streams consumption doesn't stop on every deserialisation failure/error.
* We are noticing the consumption hangs, only when the first message in the
current batch is faulty and fails to deserialise.
We did a thorough inspection of the kafka clients code and debugged by patching
with additional logs, we found the following lines of code from StreamTask.java
seems to be the issue.
*Original* - Original code
[StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
{code:java}
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
if (recordInfo.queue().size() == maxBufferedSize) {
mainConsumer.resume(singleton(partition));
}
{code}
*Patched* - Issue resolved after this fix for us.
{code:java}
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
if (recordInfo.queue().size() <= maxBufferedSize) {
mainConsumer.resume(singleton(partition));
}
{code}
We are resuming consumption only when queue size is exactly matching max
buffered size. I think some record accounting has gone wrong especially when
there is an issue with deserialising the first record in the batch.
> Kafka streams consumer stops consumption
> ----------------------------------------
>
> Key: KAFKA-17299
> URL: https://issues.apache.org/jira/browse/KAFKA-17299
> Project: Kafka
> Issue Type: Bug
> Components: consumer, streams
> Affects Versions: 3.6.2, 3.8.0, 3.7.1
> Reporter: Laxman
> Priority: Major
>
> We are using kafka clients version (3.5.2). However, the bug looks to exist
> in current version as well from our code review.
> In one of our clusters, kafka streams consumption completely stops and
> doesn't recover even after restart of the consumer instance/pod. These are
> our findings/observations from our debugging.
> * We have some deserialisation errors while the streams consuming.
> * We configured LogAndContinueExceptionHandler as exception handler to
> handle deserialisation failures.
> * Streams consumption doesn't stop on every deserialisation failure/error.
> * We are noticing the consumption hangs, only when the first message in the
> current batch is faulty and fails to deserialise.
> We did a thorough inspection of the kafka clients code and debugged by
> patching with additional logs, we found the following lines of code from
> StreamTask.java seems to be the issue.
> *Original* - Original code
> [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
> {code:java}
> // after processing this record, if its partition queue's
> buffered size has been
> // decreased to the threshold, we can then resume the consumption
> on this partition
> if (recordInfo.queue().size() == maxBufferedSize) {
> mainConsumer.resume(singleton(partition));
> }
> {code}
>
> *Patched* - Issue resolved after this fix for us.
> {code:java}
> // after processing this record, if its partition queue's
> buffered size has been
> // decreased to the threshold, we can then resume the consumption
> on this partition
> if (recordInfo.queue().size() <= maxBufferedSize) {
> mainConsumer.resume(singleton(partition));
> }
> {code}
>
> We are resuming consumption only when queue size is exactly matching max
> buffered size. I think some record accounting has gone wrong especially when
> there is an issue with deserialising the first record in the batch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)