Hi Guozhang,

Thank you for your response. When the records do come through, the right
side is null. I can provide you the full source code if you would like to
take a look at it. This is just an example, so there is very little code.
It is in a private gitlab repository right now. If you want access, feel
free to email directly and I can provide you the link and access.

Thanks,
Chad

On Tue, Aug 9, 2022 at 6:11 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Chad,
>
> Here are a few thoughts on top of my head: for left joins, we would keep
> those received records from the left side that have NOT found a match on
> the right side in a separate temporary store (this is only recently
> improved, but since you're already on 3.2.1 it's the case indeed). When
> later e.g. a right hand side record arrives and found a match on the
> temporary left hand side "no-matching-yet" store, we would delete from that
> store and emit the join result. But if no matches found as the join window
> elapsed, we would still emit those records from the "no-matching-yet" store
> and emit the join result as (left, null).
>
> In your case, I think the arrival of the second record advances the
> inferred stream time, and hence after that time advanced the first record,
> originally in the "no-matching-yet" store, are doomed to not found a match
> as join window already expires, so we would emit that record, but as I
> said, when that happens the join code should execute with the right side as
> "null". So my question is: when you see that join func executed with the
> left side as the first record, is the right side "null"? If yes I think
> that's reflecting what I'm describing here.
>
>
> Guozhang
>
>
>
>
> On Thu, Aug 4, 2022 at 9:29 AM Chad Preisler <chad.preis...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I'm doing a stream to stream leftjoin. Here is what I am seeing when I
> test
> > the code.
> >
> > - I write a record to the left side topic. The stream app reads  the
> > message and the deserializer gets triggered. However, the join is not
> > triggered at this time.
> >
> > - I write another record to the left side topic (different key) and I see
> > the deserializer get called for the topic. I see the deserializer gets
> > called a second time for a store-changelog topic and it deserializes the
> > first record. The leftjoin code is executed for the first record
> submitted.
> > This behavior isn't even consistent. Some records on the left never get
> > processed.
> >
> > Why are all the records not processed right away or at all? My join
> window
> > is just 500ms.
> >
> > I'm using the Kafka 3.2.1 client.
> >
> > Here is a code snippet of the leftjoin.
> >
> >         KStream<String, Party> partyStream =
> > streamsBuilder.stream(PARTY_TOPIC,
> >                 Consumed.with(Serdes.String(), partySerde));
> >
> >         KStream<String, ItemList> itemListStream =
> > streamsBuilder.stream(TODO_ITEMS_LIST_TOPIC,
> Consumed.with(Serdes.String(),
> > itemListSerde));
> >
> >
> >         KStream<String, Party> updatedPartyStream =
> > partyStream.leftJoin(itemListStream, (party, itemList) -> {
> >             if (itemList != null) {
> >                 party.setToDoItems(itemList.getToDoItems());
> >             }
> >             return party;
> >         },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(500)),
> >                 StreamJoined.with(Serdes.String(), partySerde,
> > itemListSerde));
> >
> > Thanks,
> > Chad
> >
>
>
> --
> -- Guozhang
>

Reply via email to