Hello,

They were getting processed by the same consumer as we only had a single 
machine running this.
What we ended up doing is basically drawing the same topology but interacting 
directly with the stateStore using the Processor API instead of DSL. Seems that 
fixed everything up (and made it way quicker).

Best,
Mauricio

On 2023/08/28 12:04:12 Claudia Kesslau wrote:
> Hi,
>
> I'm definitly no expert, but to me it sounds as not all your messages are 
> getting processed by the same consumer. Are you using the key `foo` for 
> partitioning? Is `baz` actually another key or is this mixup in your example 
> and `baz` is another value with key `foo`?
>
> Hope you find a solution to your problem.
>
> Best,
> Claudia
> ________________________________
> Von: Mauricio Lopez <ml...@silversky.com<mailto:ml...@silversky.com>>
> Gesendet: Donnerstag, 17. August 2023 22:57
> An: users@kafka.apache.org<mailto:users@kafka.apache.org> 
> <us...@kafka.apache.org<mailto:us...@kafka.apache.org>>
> Betreff: Table updates are not consistent when doing a join with a Stream
>
> Hello Folks,
>
> We are having an issue with a Kafka Streams Java application.
> We have a KStream and a KTable which are joined using a Left Join. The 
> entries in the KTable are constantly updated by the new information that 
> comes from the KStream. Each KStream message is adding entries to an array 
> that the KTable has for each key. This update gets sent back to the KTable 
> topic, expanding this array every time a new message comes from the KStream.
>
> As an example, what should be happening (and what happens in our unit tests) 
> is:
>
>
>   *   KTable has an empty array for key “foo”: []
>   *   Event 1 comes with key “foo” and value “bar”
>   *   Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the 
> same topic that the KTable is plugged into.
>   *   Event 2 comes with key “baz”
>   *   Update is pulled to mem by Ktable, and the Ktable gets updated to 
> “foo”: [“bar, “baz”], sending this change ´to´ the same topic that the KTable 
> is plugged into. Baz was appended to the array for key “foo”.
>
> But what is happening is the following:
>
>
>   *   KTable has an empty array for key “foo”: []
>
>   *   Event 1 comes with key “foo” and value “bar”
>   *   Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event 
> ´to´ the same topic that the KTable is plugged to.
>   *   Event 2 comes with key “baz”
>   *   Ktable gets updated to “foo”: [“baz”]  in the joiner, sending an event 
> ´to´ the same topic that the KTable is plugged to afterwards.
>
> This happens multiple times, and after a couple of seconds, one of the 
> incoming messages is finally appended, but many of them are lost. As you can 
> see, we suspect that when the Event 2 is received, the KTable has somehow not 
> received  the first update for adding “baz” to the array.
> This means that many events are missed, and we cannot successfully get the 
> KTable to save all the data for all the events. In turn, it sometimes 
> overwrites the updates from some events.
>
> So far, we have tried:
>
>
>   *   Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the 
> app not to cache any changes and send to the output topic instantly.
>   *   Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to 
> send all updates instantly
>   *   Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” 
> and “all” in case there is some optimization pattern that could help us.
>
>
> None of these have allowed us to have a fully consistent update of the KTable 
> each time a new event comes. It always gets overwritten or misses incoming 
> updates made by events.  Can someone advice if there’s a way to make the 
> KTable get successfully updated by each one of the events, as the first 
> example shows?
>
> Thanks,
>
> Mauricio L
>
>
> ------------------------------------------------------------------------------------
> This message is for the sole use of the intended recipient(s) and may contain 
> confidential and/or privileged information of SilverSky. Any unauthorized 
> review, use, copying, disclosure, or distribution is prohibited. If you are 
> not the intended recipient, please immediately contact the sender by reply 
> email and delete all copies of the original message.
>

Mauricio López S.
Software Engineer

Reply via email to