[ 
https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013562#comment-18013562
 ] 

Matthias J. Sax commented on KAFKA-19430:
-----------------------------------------

Well, that's all up for discussion I guess :) – And there is multiple issue we 
need to consider.
 # The idea to maybe reuse `DeserializationExceptionHandler` was, to avoid 
having too many callback interfaces, and to keep the API surface area small. 
Even if this error is not really about deserialization, it's about a broken 
read-path, so if we squint a little bit, it might be fine to reuse (misuse?) 
the existing handler? (Btw: there is actually another ticket to add RETRY to 
the other handlers https://issues.apache.org/jira/browse/KAFKA-17441, so it's 
for sure a valid option to add RETRY and reuse 
`DeserializationExceptionHandler` for this case. But I am not even sure if we 
would a RETRY option (cf below) – also, the user wanted to be able to skip over 
this error via CONTINUE).
 # Of course, there is some mismatch because `CorruptRecordException` is for a 
batch of records, and thus, we don't have a `ConsumerRecord` to pass into the 
handler...
 #  I was also just looking into the code a little bit more, and also into the 
originally reported stack trace, and it actually seems that we might not even 
get `CorruptRecordException` directly, but that `CorruptRecordException` is 
used internally only, and the consumer throws a `KafkaException` from `poll()` 
for this case instead – this would make it even more difficult to handle this 
case. – And it make it questionable if RETRY is actually a valid option?
 # In the end, yes, we could also have one unified handler for read and write 
path, but would it really make it easier to use? What API would you propose to 
unify both?

I also don't have all the answers, and that is what I meant by "I just don't 
know how complex it will be". Happy to keep brainstorming a little bit, but it 
could also turn out, that this issue will be a "won't fix", as it might be too 
complex (or we would also change the consumer to maybe throw 
`CorruptRecordException` directly). – We might also need a KIP for this, if we 
make too complex changes, or touch any public APIs.

Maybe [~lianetm] and/or [~kirktrue] could comment, as they now the consumer 
internals much better. Would be good to better understand in what internal 
state the consumer is, when such an error happens. – But even if the error is 
not retrieable directly, we could always close the consumer, do some cleanup, 
create a new one, and resume from the last committed offset (well, that's 
technically already possible via REPLACE_THREAD option of the uncaught 
exception handler). Btw: if the consumer is in a non-reusable state for this 
error, we would need to also do some clever cleanup for the CONTINUE case...

> Don't fail on RecordCorruptedException
> --------------------------------------
>
>                 Key: KAFKA-19430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Uladzislau Blok
>            Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/524]
> Currently, the existing `DeserializationExceptionHandler` is applied when 
> de-serializing the record key/value byte[] inside Kafka Streams. This implies 
> that a `RecordCorruptedException` is not handled.
> We should explore to not let Kafka Streams crash, but maybe retry this error 
> automatically (as `RecordCorruptedException extends RetriableException`), and 
> find a way to pump the error into the existing exception handler.
> If the error is transient, user can still use `REPLACE_THREAD` in the 
> uncaught exception handler, but this is a rather heavy weight approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to