[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163204#comment-17163204 ]
Guozhang Wang commented on KAFKA-8037: -------------------------------------- Here's a wild thought following [~agavra]'s last comment: what if during the normal processing, we also skip serialization as well. More specifically: 1) we deserialize the raw bytes, and trigger deser-exception handler when necessary. 2) then we put the raw bytes into the store. This is quite tricky in our current code hierarchy, and here a very hacky way on top of my mind: we use two serdes in that KTableSource state store: the first serde is what registered and would be used to deserialize the bytes from store in getter, the second serde is used to serialize the record in `MeteredXYStore`, which actually just is a "backdoor" that remembers the raw bytes before deserializing in the source node and then return that raw bytes in the `deserialize` call. 3) when forwarding to downstream processor, we forward the deserialize record from 1), this part does not change from today's code path. By doing so we 1) make the behavior of normal processing and restoration consistent (i.e. we also do the deserialization during restoration), 2) avoid any side-effects of deserialization. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)