[ 
https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028599#comment-18028599
 ] 

Uladzislau Blok commented on KAFKA-19430:
-----------------------------------------

Hello. I did more tests locally and it looks to me as this exception _could_ 
sometimes be retryable:
 # There is corruption on the disk. In this case partition will fail with 
exception when trying to read log fail and return error response to consumer
 # There is network issue when transfer message over internet. for more details 
you can check this  https://issues.apache.org/jira/browse/KAFKA-19613

I have created KIP to expose TP and offset as part of this exception, but I 
still see few gaps. If we'll just skip corrupted part we can lose the messages 
between current offset and corrupted part, I believe this is not good
I can see new approach for to fix it (inspired by rust). How [~mjsax] said 
DeserializationExceptionHandler is the read-path handler and we can try to 
catch the exception when pooling the message and then propagate it to current 
handler.

Now:
{code:java}
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
    records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
    resetOffsets(e.partitions(), e);
}        
return records;
{code}
Proposal:
{code:java}
Result<ConsumerRecords<byte[], byte[]>, Exception> result = 
Result.of(ConsumerRecords.empty());
try { 
    result = result.of(mainConsumer.poll(pollTime)); 
} catch (final InvalidOffsetException e) {     
    resetOffsets(e.partitions(), e); 
} catch (final Exception e) {
    result = result.of(e);
}
return result; {code}
This way we can catch either records or exception and 'unpack' the result 
inside of exception handler.
[~mjsax] [~lianetm] Any thoughts?

 

> Don't fail on RecordCorruptedException
> --------------------------------------
>
>                 Key: KAFKA-19430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Uladzislau Blok
>            Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/524]
> Currently, the existing `DeserializationExceptionHandler` is applied when 
> de-serializing the record key/value byte[] inside Kafka Streams. This implies 
> that a `RecordCorruptedException` is not handled.
> We should explore to not let Kafka Streams crash, but maybe retry this error 
> automatically (as `RecordCorruptedException extends RetriableException`), and 
> find a way to pump the error into the existing exception handler.
> If the error is transient, user can still use `REPLACE_THREAD` in the 
> uncaught exception handler, but this is a rather heavy weight approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to