[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692834#comment-16692834
 ] 

Guozhang Wang commented on KAFKA-7658:
--------------------------------------

[~jfilipiak] Good question! :)

Originally I thought KStream.toTable is just a syntax sugar of a dummy 
KStream.reduce() function for which I think it's not worth to add, but as 
described above as we are looking into some reported issues of KStream 
aggregations there is an overlooked semantics difference between these two. 
More specifically, aggregation / reduction functions that users provided are 
assumed to be commutative, i.e. aggregate(A, B, C, null, D) should be the same 
as aggregate(A, null, C, B, D), and if user's provided function is not 
commutative (e.g. user provided a naive implementation of top-k or median, or 
user interpret the returned null-value as "deletes" to the underlying KTable) 
then Streams are not guaranteed to handle out-of-ordering data.

On the other hand, KStream.toTable simply changes the semantics we interpret a 
stream as a changelog stream instead of a record stream, so it is actually 
different to the above function.

To answer your question above, we've added an optimization mechanism on Streams 
DSL as part of https://issues.apache.org/jira/browse/KAFKA-6761 in which a 
topology will only be generated lazily instead of eagerly as we parse the DSL. 
And to illustrate this idea you can read this PR: 
https://github.com/apache/kafka/pull/5779 which is for "not materializing" a 
source KTable even if its Materialized object is specified but the 
corresponding queryable name is not.

> Add KStream#toTable to the Streams DSL
> --------------------------------------
>
>                 Key: KAFKA-7658
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7658
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>              Labels: needs-kip, newbie
>
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference: 
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to