Doing some more investigation into this, the KTable-KTable inner join is
indeed emitting records on every update of either KTable. If their is no
match found, the record that's emitted is null. This may be a conscious
design decision due to the continuous nature of the join, although I'd love
to hear confirmation or commentary on that.

Assuming the above is true, I think a KTable-KTable join followed by a
groupBy is simply not possible.

I discovered a different approach which seems roundabout, but appears to
work. I can convert the joined KTable to a KStream, filter out null values,
and then use a KStream.map operation to change the key (rather than
repartitioning via groupBy). Finally, reduceByKey can get us back to a
KTable:

table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v
== null).map( ... ).reduceByKey( ... )

Is it necessary to convert to a KStream in order to filter out the null
join values?

---------- Forwarded message ----------
From: Jeff Klukas <[email protected]>
To: [email protected]
Cc: Guozhang Wang <[email protected]>
Date: Wed, 8 Jun 2016 10:56:26 -0400
Subject: Handling of nulls in KTable groupBy
I have a seemingly simple case where I want to join two KTables to produce
a new table with a different key, but I am getting NPEs. My understanding
is that to change the key of a KTable, I need to do a groupBy and a reduce.

What I believe is going on is that the inner join operation is emitting
nulls in the case that no matching record is found in one of the source
KTables. The groupBy operation then receives null inputs that it's not
expecting.

Here is the snippet of code where I define the join and the groupBy:

customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
                (Integer accountId, String userId) -> {
                    return new KeyValue<>(accountId, userId);
                })
                .groupBy((Integer customerId, KeyValue<Integer, String> kv)
-> {
                    return kv;
                }, Serdes.Integer(), Serdes.String())

This produces the following exception:

! java.lang.NullPointerException: null
! at
org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
KTableMapProcessor.process(KTableRepartitionMap.java:88)

Am I approaching this incorrectly, or is there a bug going on? Should a
KTable-KTable inner join be emitting records when no match is found?

Reply via email to