[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163126#comment-17163126
 ] 

Matthias J. Sax commented on KAFKA-8037:
----------------------------------------

{quote}It is currently true that this is _only_ used to apply the source 
changelog optimization.
{quote}
How thought? Our optimizer does more, eg, merging redundant repartition 
topics... And in contract to remove/deprecate `build()`, I actually proposed in 
KIP-591 to deprecate the parameterless `build()` method and only keep 
`build(Properties)`...
{quote}you have to pass two different instances of the same serde supplier 
instead of the same instance of the serde supplier twice in order to get a 
changelog topic for your source table
{quote}
Luckily, there is nothing like a SerdeSupplier :)

In general, I think we should try do this optimization by default, because (1) 
it reduces the cost to run a topology (2) we have in general positive 
experience with it, and (3) it seems to be the most common use-case for 
`builder.table()` that people just want to populate a changelog into a table 
without any data modification. – All mentioned corner cases for which we cannot 
do the optimization seem to be in the minority.

For schema evolution, I don't buy the argument that the serde is strictly 
asymmetric, or to be more precise, from a correctness point of view I don't see 
why it's problematic? If a serde read a byte-array A (with old schema) and 
writes it back as byte-array B (with new schema), even if we apply the 
optimization and push byte-array A into the store later during a restore, the 
Serde would still be able to read it (it was also able to read A from the 
source topics, so I can also read it from the store). For Serdes that actually 
do a projection-on-read, I agree with Sophie that this should be discouraged in 
general and a proper map()/mapValues() should be used instead – for this case, 
the user automatically opts-out anyway as there are "two table" and the second 
table will get it's own changelog topic (if a user want to do the projection 
with the serde for per-reasons, they can still do is, and add an no-op/identify 
mapValues() step in between – I would rather optimize for less experienced 
users than for experiences once. To quote my favorite API design advice: 
"simple thing should be easy, and hard things should be possible").

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to