Hi Mathieu,

That sounds frustrating. I’m sorry for the trouble.

>From what you described, it does sound like something wacky is going on with 
>the partitioning. In particular, the fact that both joins work when you set 
>everything to 1 partition. 

You mentioned that you’re using the default partitioner everywhere. Can you 
confirm whether all the source topics and all the repartition topics also have 
the same number of partitions (when you’re not forcing them to 1, of course)?

Are the transformers changing the keys? If they are not, then you can use 
transformValue to avoid the repartition. If they are, then the repartition 
topics are indeed necessary. Streams should ensure that the repartition topics 
get the same number of partitions as the topic you’re joining with.

As you mentioned, I can only speculate without seeing the code. I think my next 
step would be to find a specific null join output that you think should have 
been non-null and trace the key back through the topology. You should be able 
to locate the key in each of the topics and state stores to verify that it’s in 
the right partition all the way through. 

You could also experiment with the trace logs, but they are super verbose. Or 
you could try running the app in an IDE and setting breakpoints to figure out 
what is happening each step of the way.

The funny thing about a leftJoin in particular is that it would only be null if 
you’re getting records from the right, but none from the left. Any record from 
the left would instead produce a K:(LeftVal, null) result. It seems like even 
if the repartition is somehow going to the wrong partition, you should see the 
(left, null) result at some point. I’m struggling to think why you would only 
see null results. 


I hope this helps!
-John


On Mon, Aug 10, 2020, at 09:44, Mathieu D wrote:
> Dear community,
> 
> I have a quite tough problem I struggle to diagnose and fix.
> I'm not sure if it's a bug in Kafka-streams or some subtlety I didn't get
> in using the DSL api.
> 
> The problem is the following.
> We have a quite elaborate stream app, working well, in production. We'd
> like to add a left join with a KTable (data is coming from a DB via kafka
> connect jdbc source).
> So we end-up with a topology like this:
> 
> Event Source ---- (some transformations and joins) ----- leftJoin(A:
> KTable) ----- leftJoin(B: KTable) ---- sinks
> 
> The new leftjoin is the one joining A.
> The transformations are several custom Transformers.
> 
> In tests with TopologyTestDriver, all is good, we can validate the general
> logic.
> In integration tests with a real Kafka (in a docker, though), we can't
> manage to have both left joins work at the same time !
> The leftJoin with `A` always return null.
> 
> I ran dozen of tests, tweaking and fiddling everything, and I found out
> that it's related to partitioning. If I force the number of partitions down
> to 1 (by setting all input topics to 1 partition), the join works.
> In one of the tests, I suspected one of the transformations, so I removed
> it. The topology shown by describe() changed quite significantly (going
> from 2 subtoplogies to 1), and this made the leftJoin with A work...and the
> leftJoin with B fail.
> 
> It drives me crazy.
> 
> Activating the optimization didn't help.
> The input topics for KTables A and B are read with a TimestamExtractor to
> 0, since this is static data, to make sure we don't run into timestamp
> ordering issues.
> We double-checked and tripled-checked the keys in various stages, and we're
> sure they're good (by the way, it works with 1 partition).
> Partitioner is always the default everywhere (in inputs, kafka-connect...),
> actually we never touch this.
> 
> Actually it seems related to repartitioning placed in the topology by
> StreamsBuilder (probably in relation to transformers ?)
> 
> So, I imagine you can't help much without seeing the code, but if you think
> of anything that could help diagnosing this further, please tell.
> 
> Mathieu
>

Reply via email to