[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16785420#comment-16785420 ]
Patrik Kleindl commented on KAFKA-8037: --------------------------------------- [~guozhang] Does that work too if LogAndContinueExceptionHandler is used or will that "swallow" the error you mentioned above to know when to quit restoring? I am trying to work my way through the code and reproduce this with a test for the global state stores but I might need some help there. Just for my understanding, the TopologyDriver can't cover this scenario, or did I miss something there? Currently I am trying something similar to GlobalStateManagerImplTest.shouldSkipNullKeysWhenRestoring but was not really successful. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)