[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161796#comment-17161796 ]
Matthias J. Sax commented on KAFKA-8037: ---------------------------------------- I believe that the optimization is still worth to have. That the input topic is log-compacted is not a crazy assumption IMHO. And in fact, we have this optimization for a long time and I am not aware of any data lost issues due to incorrect retention configuration? I am also less concerned about Consumered/Materialized. Often user might specify the same serde in both places but it should be possible to detect this case. Hence, instead of a blind check if to we get one or two serdes, we should check if we got the same or different serdes. Similar for serialization exception handling: if the default one is used, we _know_ that no corrupted data can be in the store and thus it seems safe to just do blind bulk loading – what I obviously don't know is how may people might use the default? I also strongly believe that async serdes should be declared and anti-pattern. Instead of trying to make it work, it might be more reasonable to tell people that it's a bad idea and not supported. In general, I think it makes sense to allow users to opt-in/opt-out on a per table basis via the API. If we need a `useSourceAsChangelog` I am not sure. We could also educate people to use `builder.stream().toTable()` – if possible, it seems desirable to not increase the API if we don't need to? For GlobalKTables or actually global-state-store we might want to reconsider the design: note that for GlobalKTable not intermediate processing happens. Only for global-state-store people can plug-in a custom Processor (which is currently not allowed to do any actually processing). Having an extra changelog topic to begin with does not really make sense (in contrast to the KTable case). However, we could make the API simpler: instead of letting users define the "store maintainer Processor" we would always provide it (including for the PAPI) – to allow users to preprocess the data, we allow them the plug-in a processor between the source and the store-maintainer instead. In the DSL, we never plug-in an intermediate processor but only have source+storeMaintainer – if we detect this pattern, we know that the data was not modified and we can do some optimizations during state restore. For all other cases, we need to do the restore as regular processing as suggested by Guozhang. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)