[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042172#comment-17042172
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

This is an interesting idea. As for whether that blocks on KAFKA-9368, I think 
it would not: today when we are restoring we do not modify the per-processor 
stream-time or partition-time at all, and the partition time is initialized 
from commit offsets as we fixed recently, so the stream-time which would be 
more later then stream-time could just be initialized, right?

I'm wondering in practice how large portion bad data could be incurred during 
normal processing, and if we expect that is rare maybe instead of creating an 
"inverse-changelog" we can just store that list of "inverse-offset" as part of 
the commit metadata, which today only stores the partition-time. Then upon 
restoring we can get that list of offsets from the committed offset metadata, 
and just skip restoring the corresponding offset. This is a simpler approach 
but of course if the list is long then storing them as a single record would 
exceed the offset limit.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to