Hi Ivan,

If I understand you correct, the issue with the leftJoin is that your
stream does contain records with key==null and thus those records get
dropped?

What about this:

streamBB = streamB.selectKey(..);
streamC = streamB.leftJoin(tableA);
streamBNull = streamB.filter((k,v) -> k == null);

Thus streamBNull contains all the record that will drop out and not be
contained in streamC.

Does this help?


-Matthias



On 12/1/16 4:42 AM, Ivan Ilichev wrote:
> Hi Guys,
> 
> I am implementing a stream processor where I aggregate a stream of events
> by their keys into a KTable tableA and then I am “enriching” another
> streamB by the values of tableA.
> 
> So essentially I have this:
> 
> streamC = streamB
>   .selectKey(..)
>   .leftJoin(tableA);
> 
> This works great however in add to also need to produce a stream of records
> from streamB which are the inverse, in other words records which failed the
> join (key was null for them). This is similar to what the “branch” API does
> for filtering on multiple predicates. So when the leftJoin fails I need to
> do something else with the result - potentially another enrichment by join.
> 
> Is this something that can be accomplished by Kafka Streams DSL directly or
> do I need to implement my processor which does this branching?
> 
> In this case - I would have to query the state store directly which should
> not be a problem. However - would that not be a problem in terms of
> partitioning of the state store (for tableA) and the selectKey operation on
> streamB. In other words - if two streams use the same partitioning on the
> same key, their partitions should be visible to the same instances, correct?
> 
> Using Kafka Streams 0.10.1.0 here.
> 
> Regards,
> -Ivan
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to