Hi Peter, It depends on how you specify the JoinWindow. But using `JoinWindows.of(10 secocds)` would mean that a record will join with any other record with the matching key that arrived between 10 seconds before it arrived and 10 seconds after it arrived.
So your example is correct. You would need to have a JoinWIndow large enough to allow for the expected difference in arrival time. Thanks, Damian On Sat, 19 May 2018 at 19:50 Peter Kleinmann <nnamni...@gmail.com> wrote: > Hi Damian, > > thank you for the informative reply. I think this answers 95% of my > questions (or maybe 100% and I missed the explanation). > > what is still unresolved is how to handle trades and risks that arrive far > apart. > > Suppose we have > > timeToAllowAJoin = 10 seconds > > and we have > > Time | Trade | Risk > 0s ---------------------------------------------- > > 1s Trade(t1, v1) > 4s Trade(t2, v1) > 5s Trade(t3, v1) > 8s Risk(t2, v1) > 10s ---------------------------------------------- > 14s Risk(t1, v1) > 20s ---------------------------------------------- > 27s Risk(t4, v1) > 30s ---------------------------------------------- > 37s Risk(t3, v1) > 40s ---------------------------------------------- > 47s Trade(t4, v1) > 50s ---------------------------------------------- > > > I think > trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin)); > > will join > Risk(t2,v1) -> Trade(t2,v1) > for window 0-10s efficiently > > but I don't think I get the other joins, even running > trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin)); > for windows > 10s - 20s > 20s - 30s > 40s - 40s > If this is correct, then is there another common way to handle a scenario > like the one above? > > thanks in advance, > > Peter > > > > > > > > On Fri, May 18, 2018 at 6:27 PM, Damian Guy <damian....@gmail.com> wrote: > >> Hi, >> >> In order to join the two streams they need to have the same key and the >> same number of partitions in each topic. If they don't have the same key >> you can force a repartition by using: >> >> `stream.selectKey(KeyValueMapper)` >> >> if the number of partitions is also different you could do: >> `stream.selectKey(KeyValueMapper).through("your-new-topic")` >> >> You would need to create "your-new-topic" in advance with the correct >> number of partitions. >> >> Now assuming that we have the same key and the same number of partitions, >> the join is something like: >> >> `trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));` >> >> Because the trade and risk have the same key when a trade or risk event >> arrives you will only join against the corresponding event (within the >> time >> window specified in the join). For example: >> >> Trade <t1, v1> >> Trade <t2, v1> >> Risk <t1, v1> -> join(Trade <t1,v1>) >> Risk<t2, v1> -> join(Trade <t2, v1>) >> >> Note: if multiple events for the same key arrive within the same >> JoinWindow >> you will get multiple outputs. However, you could avoid this from going >> downstream by using `transformValues(..)` after the join. You would attach >> a StateStore to the `transformValues`, i.e., by first creating the store >> and then passing in the store name as a param to the method. Then when a >> join result for a given key arrives, your transformer would first check in >> the store if there was already a result, if there isn't a result update >> the >> store and send the result downstream. If there is a result you drop it. >> >> Regards, >> Damian >> >> >> >> - >> >> On Fri, 18 May 2018 at 22:57 Peter Kleinmann <nnamni...@gmail.com> wrote: >> >> > Dear community, sorry in advance for what will be a newbie question: >> > >> > >> > suppose I have two topics >> > trades >> > risks >> > >> > and I want to join a trade in the trades topic to a risk message in the >> > risks topic by fields tradeId, and version, which exist in both trade >> and >> > risk messages. >> > >> > Seems I can naturally create streams on top of each topic, but here is >> the >> > question: >> > >> > Suppose in one period between time boundary b0 and b1 trades t1 and t2 >> > arrive, and risk r1 matching t1 arrives. >> > >> > In the next period, risk r2 arrives matching t2. >> > >> > a) How do I join r2 to t2? >> > >> > b) How do I not reprocess t1 and r1? >> > >> > I'm going to have between 2 million and 25 million trades and risks a >> day, >> > so once a trade and risk has been matched, I dont want to handle them >> > again. >> > >> > Do I need to sink the kafka topics to something like postgres, and have >> a >> > umatched trades table >> > unmatched risks table >> > matched table >> > >> > Many Many Thanks in Advance!!! >> > >> > >