[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007093#comment-17007093
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

[~pkleindl] for case 2) all the records before the committed offsets are 
processed normally, i.e. deserialized first, and then processed and then used 
to update the store and hence we are ensured they are not bad data otherwise 
the exception handler would trigger.

[~ableegoldman] That's a good point (we thought about this in 
https://issues.apache.org/jira/browse/KAFKA-7663 also), but I'm thinking more 
from the timestamp-synchronization pov: if we always restore the source KTable 
to some point before normal processing, then e.g. in a stream-table join when 
the next record from stream is fetched with timestamp T1, it is likely that it 
now has to be joined with the table which is already restored up to T2 which is 
much larger than T1 already. Only during normal processing we would try to 
select next record across input partitions trying to synchronize their time. In 
case 1) / 2) though, we know that the table has been normally processed up to 
the committed offset (for both the source table as well as the source stream) 
so restoring them to that point is valid.



> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to