[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006992#comment-17006992
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

For non-global KTables, it now makes me thinking if our current mechanism for 
source KTables on restoring is sound: upon (re-) starting that application, 
there should only be three cases: 

1) the local checkpointed offset is equal to the committed offset, in which 
case there should be no restoring needed at all.
2) the local checkpointed offset is smaller to the committed offset (including 
the case checkpoint offset does not exist, in which case it is just the 
beginning offset), in which case we can safely restore up to the committed 
offset since these records have been through serde during the normal processing 
in the last run.
3) there's no committed offset, in this case there MUST be no checkpointed 
offset too. Now this case is the one that I'm now wondering: today we just 
restore to the read-once end-offset, in which case it is possible to load bad 
data since they do not go through normal serde in the last run, but should we 
really do it? Or should we just transit to normal processing all at once?

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Patrik Kleindl
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to