Re: AW: Table updates are not consistent when doing a join with a Stream

2023-09-04 Thread Matthias J. Sax
Your update to the KTable is async when you send data back to the KTable 
input topic. So your program is subject to race-conditions.


So switching to the PAPI was the right move: it make the update to the 
state store sync and thus fixes the issue.



-Matthias

On 9/4/23 5:53 AM, Mauricio Lopez wrote:

Hello,

They were getting processed by the same consumer as we only had a single 
machine running this.
What we ended up doing is basically drawing the same topology but interacting 
directly with the stateStore using the Processor API instead of DSL. Seems that 
fixed everything up (and made it way quicker).

Best,
Mauricio

On 2023/08/28 12:04:12 Claudia Kesslau wrote:

Hi,

I'm definitly no expert, but to me it sounds as not all your messages are 
getting processed by the same consumer. Are you using the key `foo` for 
partitioning? Is `baz` actually another key or is this mixup in your example 
and `baz` is another value with key `foo`?

Hope you find a solution to your problem.

Best,
Claudia

Von: Mauricio Lopez mailto:ml...@silversky.com>>
Gesendet: Donnerstag, 17. August 2023 22:57
An: users@kafka.apache.org 
mailto:us...@kafka.apache.org>>
Betreff: Table updates are not consistent when doing a join with a Stream

Hello Folks,

We are having an issue with a Kafka Streams Java application.
We have a KStream and a KTable which are joined using a Left Join. The entries 
in the KTable are constantly updated by the new information that comes from the 
KStream. Each KStream message is adding entries to an array that the KTable has 
for each key. This update gets sent back to the KTable topic, expanding this 
array every time a new message comes from the KStream.

As an example, what should be happening (and what happens in our unit tests) is:


   *   KTable has an empty array for key “foo”: []
   *   Event 1 comes with key “foo” and value “bar”
   *   Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the same 
topic that the KTable is plugged into.
   *   Event 2 comes with key “baz”
   *   Update is pulled to mem by Ktable, and the Ktable gets updated to “foo”: 
[“bar, “baz”], sending this change ´to´ the same topic that the KTable is 
plugged into. Baz was appended to the array for key “foo”.

But what is happening is the following:


   *   KTable has an empty array for key “foo”: []

   *   Event 1 comes with key “foo” and value “bar”
   *   Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to.
   *   Event 2 comes with key “baz”
   *   Ktable gets updated to “foo”: [“baz”]  in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to afterwards.

This happens multiple times, and after a couple of seconds, one of the incoming 
messages is finally appended, but many of them are lost. As you can see, we 
suspect that when the Event 2 is received, the KTable has somehow not received  
the first update for adding “baz” to the array.
This means that many events are missed, and we cannot successfully get the 
KTable to save all the data for all the events. In turn, it sometimes 
overwrites the updates from some events.

So far, we have tried:


   *   Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the 
app not to cache any changes and send to the output topic instantly.
   *   Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to 
send all updates instantly
   *   Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” and 
“all” in case there is some optimization pattern that could help us.


None of these have allowed us to have a fully consistent update of the KTable 
each time a new event comes. It always gets overwritten or misses incoming 
updates made by events.  Can someone advice if there’s a way to make the KTable 
get successfully updated by each one of the events, as the first example shows?

Thanks,

Mauricio L



This message is for the sole use of the intended recipient(s) and may contain 
confidential and/or privileged information of SilverSky. Any unauthorized 
review, use, copying, disclosure, or distribution is prohibited. If you are not 
the intended recipient, please immediately contact the sender by reply email 
and delete all copies of the original message.



Mauricio López S.
Software Engineer



RE: AW: Table updates are not consistent when doing a join with a Stream

2023-09-04 Thread Mauricio Lopez
Hello,

They were getting processed by the same consumer as we only had a single 
machine running this.
What we ended up doing is basically drawing the same topology but interacting 
directly with the stateStore using the Processor API instead of DSL. Seems that 
fixed everything up (and made it way quicker).

Best,
Mauricio

On 2023/08/28 12:04:12 Claudia Kesslau wrote:
> Hi,
>
> I'm definitly no expert, but to me it sounds as not all your messages are 
> getting processed by the same consumer. Are you using the key `foo` for 
> partitioning? Is `baz` actually another key or is this mixup in your example 
> and `baz` is another value with key `foo`?
>
> Hope you find a solution to your problem.
>
> Best,
> Claudia
> 
> Von: Mauricio Lopez mailto:ml...@silversky.com>>
> Gesendet: Donnerstag, 17. August 2023 22:57
> An: users@kafka.apache.org 
> mailto:us...@kafka.apache.org>>
> Betreff: Table updates are not consistent when doing a join with a Stream
>
> Hello Folks,
>
> We are having an issue with a Kafka Streams Java application.
> We have a KStream and a KTable which are joined using a Left Join. The 
> entries in the KTable are constantly updated by the new information that 
> comes from the KStream. Each KStream message is adding entries to an array 
> that the KTable has for each key. This update gets sent back to the KTable 
> topic, expanding this array every time a new message comes from the KStream.
>
> As an example, what should be happening (and what happens in our unit tests) 
> is:
>
>
>   *   KTable has an empty array for key “foo”: []
>   *   Event 1 comes with key “foo” and value “bar”
>   *   Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the 
> same topic that the KTable is plugged into.
>   *   Event 2 comes with key “baz”
>   *   Update is pulled to mem by Ktable, and the Ktable gets updated to 
> “foo”: [“bar, “baz”], sending this change ´to´ the same topic that the KTable 
> is plugged into. Baz was appended to the array for key “foo”.
>
> But what is happening is the following:
>
>
>   *   KTable has an empty array for key “foo”: []
>
>   *   Event 1 comes with key “foo” and value “bar”
>   *   Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event 
> ´to´ the same topic that the KTable is plugged to.
>   *   Event 2 comes with key “baz”
>   *   Ktable gets updated to “foo”: [“baz”]  in the joiner, sending an event 
> ´to´ the same topic that the KTable is plugged to afterwards.
>
> This happens multiple times, and after a couple of seconds, one of the 
> incoming messages is finally appended, but many of them are lost. As you can 
> see, we suspect that when the Event 2 is received, the KTable has somehow not 
> received  the first update for adding “baz” to the array.
> This means that many events are missed, and we cannot successfully get the 
> KTable to save all the data for all the events. In turn, it sometimes 
> overwrites the updates from some events.
>
> So far, we have tried:
>
>
>   *   Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the 
> app not to cache any changes and send to the output topic instantly.
>   *   Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to 
> send all updates instantly
>   *   Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” 
> and “all” in case there is some optimization pattern that could help us.
>
>
> None of these have allowed us to have a fully consistent update of the KTable 
> each time a new event comes. It always gets overwritten or misses incoming 
> updates made by events.  Can someone advice if there’s a way to make the 
> KTable get successfully updated by each one of the events, as the first 
> example shows?
>
> Thanks,
>
> Mauricio L
>
>
> 
> This message is for the sole use of the intended recipient(s) and may contain 
> confidential and/or privileged information of SilverSky. Any unauthorized 
> review, use, copying, disclosure, or distribution is prohibited. If you are 
> not the intended recipient, please immediately contact the sender by reply 
> email and delete all copies of the original message.
>

Mauricio López S.
Software Engineer


AW: Table updates are not consistent when doing a join with a Stream

2023-08-28 Thread Claudia Kesslau
Hi,

I'm definitly no expert, but to me it sounds as not all your messages are 
getting processed by the same consumer. Are you using the key `foo` for 
partitioning? Is `baz` actually another key or is this mixup in your example 
and `baz` is another value with key `foo`?

Hope you find a solution to your problem.

Best,
Claudia

Von: Mauricio Lopez 
Gesendet: Donnerstag, 17. August 2023 22:57
An: users@kafka.apache.org 
Betreff: Table updates are not consistent when doing a join with a Stream

Hello Folks,

We are having an issue with a Kafka Streams Java application.
We have a KStream and a KTable which are joined using a Left Join. The entries 
in the KTable are constantly updated by the new information that comes from the 
KStream. Each KStream message is adding entries to an array that the KTable has 
for each key. This update gets sent back to the KTable topic, expanding this 
array every time a new message comes from the KStream.

As an example, what should be happening (and what happens in our unit tests) is:


  *   KTable has an empty array for key “foo”: []
  *   Event 1 comes with key “foo” and value “bar”
  *   Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the same 
topic that the KTable is plugged into.
  *   Event 2 comes with key “baz”
  *   Update is pulled to mem by Ktable, and the Ktable gets updated to “foo”: 
[“bar, “baz”], sending this change ´to´ the same topic that the KTable is 
plugged into. Baz was appended to the array for key “foo”.

But what is happening is the following:


  *   KTable has an empty array for key “foo”: []

  *   Event 1 comes with key “foo” and value “bar”
  *   Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to.
  *   Event 2 comes with key “baz”
  *   Ktable gets updated to “foo”: [“baz”]  in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to afterwards.

This happens multiple times, and after a couple of seconds, one of the incoming 
messages is finally appended, but many of them are lost. As you can see, we 
suspect that when the Event 2 is received, the KTable has somehow not received  
the first update for adding “baz” to the array.
This means that many events are missed, and we cannot successfully get the 
KTable to save all the data for all the events. In turn, it sometimes 
overwrites the updates from some events.

So far, we have tried:


  *   Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the 
app not to cache any changes and send to the output topic instantly.
  *   Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to 
send all updates instantly
  *   Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” and 
“all” in case there is some optimization pattern that could help us.


None of these have allowed us to have a fully consistent update of the KTable 
each time a new event comes. It always gets overwritten or misses incoming 
updates made by events.  Can someone advice if there’s a way to make the KTable 
get successfully updated by each one of the events, as the first example shows?

Thanks,

Mauricio L



This message is for the sole use of the intended recipient(s) and may contain 
confidential and/or privileged information of SilverSky. Any unauthorized 
review, use, copying, disclosure, or distribution is prohibited. If you are not 
the intended recipient, please immediately contact the sender by reply email 
and delete all copies of the original message.