[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161796#comment-17161796
 ] 

Matthias J. Sax commented on KAFKA-8037:
----------------------------------------

I believe that the optimization is still worth to have. That the input topic is 
log-compacted is not a crazy assumption IMHO. And in fact, we have this 
optimization for a long time and I am not aware of any data lost issues due to 
incorrect retention configuration?

I am also less concerned about Consumered/Materialized. Often user might 
specify the same serde in both places but it should be possible to detect this 
case. Hence, instead of a blind check if to we get one or two serdes, we should 
check if we got the same or different serdes.

Similar for serialization exception handling: if the default one is used, we 
_know_ that no corrupted data can be in the store and thus it seems safe to 
just do blind bulk loading – what I obviously don't know is how may people 
might use the default? I also strongly believe that async serdes should be 
declared and anti-pattern. Instead of trying to make it work, it might be more 
reasonable to tell people that it's a bad idea and not supported.

In general, I think it makes sense to allow users to opt-in/opt-out on a per 
table basis via the API. If we need a `useSourceAsChangelog` I am not sure. We 
could also educate people to use `builder.stream().toTable()` – if possible, it 
seems desirable to not increase the API if we don't need to?

For GlobalKTables or actually global-state-store we might want to reconsider 
the design: note that for GlobalKTable not intermediate processing happens. 
Only for global-state-store people can plug-in a custom Processor (which is 
currently not allowed to do any actually processing). Having an extra changelog 
topic to begin with does not really make sense (in contrast to the KTable 
case). However, we could make the API simpler: instead of letting users define 
the "store maintainer Processor" we would always provide it (including for the 
PAPI) – to allow users to preprocess the data, we allow them the plug-in a 
processor between the source and the store-maintainer instead. In the DSL, we 
never plug-in an intermediate processor but only have source+storeMaintainer – 
if we detect this pattern, we know that the data was not modified and we can do 
some optimizations during state restore. For all other cases, we need to do the 
restore as regular processing as suggested by Guozhang.

 

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to