[
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474389#comment-17474389
]
Aleksandr Sorokoumov commented on KAFKA-6502:
---------------------------------------------
[~jadireddi] Are you actively working on this issue? If not, I would like to
give it a try.
> Kafka streams deserialization handler not committing offsets on error records
> -----------------------------------------------------------------------------
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Soby Chacko
> Assignee: Jagadesh Adireddi
> Priority: Minor
>
> See this StackOverflow issue:
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment:
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
> I am trying to use the LogAndContinueExceptionHandler on deserialization. It
> works fine when an error occurs by successfully logging and continuing.
> However, on a continuous stream of errors, it seems like these messages are
> not committed and on a restart of the application they reappear again. It is
> more problematic if I try to send the messages in error to a DLQ. On a
> restart, they are sent again to DLQ. As soon as I have a good record coming
> in, it looks like the offset moves further and not seeing the already logged
> messages again after a restart.
> I reproduced this behavior by running the sample provided here:
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on
> input and reduced the commit interval to just 1 second. Also added the
> following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
> LogAndContinueExceptionHandler.class);}}.
> It looks like when deserialization exceptions occur, this flag is never set
> to be true here:
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
> It only becomes true once processing succeeds. That might be the reason why
> commit is not happening even after I manually call processorContext#commit().
--
This message was sent by Atlassian Jira
(v8.20.1#820001)