[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692535#comment-16692535
 ] 

Matthias J. Sax commented on KAFKA-7577:
----------------------------------------

An `aggregation()` is designed like in SQL, that also drops `null`s.

Note, that while an `aggregation` *returns* a KTable, the *input* is a KStream 
that represents fact, but not updates... That is the main difference: if you 
read a topic directly into a `KTable`, you tell KafkaStreams to use update 
semantics including tombstone semantics. However, when you aggregate a 
`KStream` there is no update/tombstone semantics for the *input* records.
{quote}how are deletions suppose to propagate 
{quote}
There is no reason to propagate them, as this concept does not exist in a 
KStream. Also note, that `KStream#groupByKey()` returns a `KGrouped*Stream*` 
but not a `KGroupedTable` (that would be result of `KTable#groupBy()`). It 
might seem subtle, but from my point of view an `aggregate()` is not the same 
as an UPSERT (what you want).

If you want to treat the data as table updates, you should read the topic as a 
table directly (from a semantic point of view) and create a derived table with 
`KTable#groupBy()` to repartition accordingly. If you want to avoid the 
materialization overhead for the first table, you can also do 
`stream.map(/*select new key*/).to("my-new-changelog-topic"); KTable table = 
builder.table("my-new-changelog-topic")` instead.

We are working on an optimization atm, that will allow to read a topic as 
`KTable` directly, while the KTable will only be materialized if necessary. 
This should address your use case, as we would only materialize the second 
KTable. For now, you can use your own workaround (even if I think you miss use 
the API, as I believe an aggregation is not the right operator to implement an 
UPSERT—my personal opinion only), or with the intermediate topic as suggested. 
I also want to point out, that having an UPSERT operator might actually be 
useful. But instead of changing `aggregate()` I would rather add 
`KStream#toTable()` operator (similar to, or reverse of, `KTable#toStream()`) 
for this use case. Because it's semantically two different things, there should 
be two different operators for both IMHO.

Does this make sense?

> Semantics of Table-Table Join with Null Message Are Incorrect
> -------------------------------------------------------------
>
>                 Key: KAFKA-7577
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7577
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Daren Thomas
>            Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to