Hi,

In order to join the two streams they need to have the same key and the
same number of partitions in each topic. If they don't have the same key
you can force a repartition by using:

`stream.selectKey(KeyValueMapper)`

if the number of partitions is also different you could do:
`stream.selectKey(KeyValueMapper).through("your-new-topic")`

You would need to create "your-new-topic" in advance with the correct
number of partitions.

Now assuming that we have the same key and the same number of partitions,
the join is something like:

`trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));`

Because the trade and risk have the same key when a trade or risk event
arrives you will only join against the corresponding event (within the time
window specified in the join). For example:

Trade <t1, v1>
Trade <t2, v1>
Risk <t1, v1>  -> join(Trade <t1,v1>)
Risk<t2, v1> -> join(Trade <t2, v1>)

Note: if multiple events for the same key arrive within the same JoinWindow
you will get multiple outputs. However, you could avoid this from going
downstream by using `transformValues(..)` after the join. You would attach
a StateStore to the `transformValues`, i.e., by first creating the store
and then passing in the store name as a param to the method. Then when a
join result for a given key arrives, your transformer would first check in
the store if there was already a result, if there isn't a result update the
store and send the result downstream. If there is a result you drop it.

Regards,
Damian



 -

On Fri, 18 May 2018 at 22:57 Peter Kleinmann <nnamni...@gmail.com> wrote:

> Dear community, sorry in advance for what will be a newbie question:
>
>
> suppose I have two topics
> trades
> risks
>
> and I want to join a trade in the trades topic to a risk message in the
> risks topic by fields tradeId, and version, which exist in both trade and
> risk messages.
>
> Seems I can naturally create streams on top of each topic, but here is the
> question:
>
> Suppose in one period between time boundary b0 and b1 trades t1 and t2
> arrive, and risk r1 matching t1 arrives.
>
> In the next period, risk r2 arrives matching t2.
>
> a) How do I join r2 to t2?
>
> b) How do I not reprocess t1 and r1?
>
> I'm going to have between 2 million and 25 million trades and risks a day,
> so once a trade and risk has been matched, I dont want to handle them
> again.
>
> Do I need to sink the kafka topics to something like postgres, and have a
> umatched trades table
> unmatched risks table
> matched table
>
> Many Many Thanks in Advance!!!
>

Reply via email to