Hi Ivan, If I understand you correct, the issue with the leftJoin is that your stream does contain records with key==null and thus those records get dropped?
What about this: streamBB = streamB.selectKey(..); streamC = streamB.leftJoin(tableA); streamBNull = streamB.filter((k,v) -> k == null); Thus streamBNull contains all the record that will drop out and not be contained in streamC. Does this help? -Matthias On 12/1/16 4:42 AM, Ivan Ilichev wrote: > Hi Guys, > > I am implementing a stream processor where I aggregate a stream of events > by their keys into a KTable tableA and then I am “enriching” another > streamB by the values of tableA. > > So essentially I have this: > > streamC = streamB > .selectKey(..) > .leftJoin(tableA); > > This works great however in add to also need to produce a stream of records > from streamB which are the inverse, in other words records which failed the > join (key was null for them). This is similar to what the “branch” API does > for filtering on multiple predicates. So when the leftJoin fails I need to > do something else with the result - potentially another enrichment by join. > > Is this something that can be accomplished by Kafka Streams DSL directly or > do I need to implement my processor which does this branching? > > In this case - I would have to query the state store directly which should > not be a problem. However - would that not be a problem in terms of > partitioning of the state store (for tableA) and the selectKey operation on > streamB. In other words - if two streams use the same partitioning on the > same key, their partitions should be visible to the same instances, correct? > > Using Kafka Streams 0.10.1.0 here. > > Regards, > -Ivan >
signature.asc
Description: OpenPGP digital signature