The non-deterministic behavior you're seeing might be the result of a
timing issue.  In other words, in some cases your KTable is fully populated
by the time data in "streamToJoin" is trying to find a match in
"lookupTable" and in other cases it isn't. If you haven't already, you
might want to take a look at using a GlobalKTable to see if that will work
for your use-case.  On startup, I believe Kafka Streams will wait until the
GlobalKTable has fully consumed the topic before data starts flowing.
There are downsides to  GlobalKTable's (check the documentation), but if
this is just a lookup table where the data is fairly static then it might
make sense.

Alex

On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <bartlomiej.k...@gmail.com>
wrote:

> Hi All,
> Since some time I’m involved in development of application that
> incorporates Kafka Streams API, I’m facing the problem with joining two
> Kafka topics. The problem is illustrated in simple test that ws prepared
> based on our production code. It is available here:
> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
> As seen in the class JoinStreamBuilder:
>
> val builder = StreamsBuilder()
>
> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
>
> val streamToJoin = builder.stream(mainTopicName,
> Consumed.with(Serdes.String(), genericAvroSerde))
>         .selectKey(MainKeySelector())
>
> val lookupTable = builder.stream(lookupTableTopicName,
> Consumed.with(Serdes.String(), genericAvroSerde))
>         .selectKey(LookupKeySelector())
>         .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
>         .reduce({ _, new -> new },
>                 Materialized.`as`<String, GenericRecord,
> KeyValueStore<Bytes,
> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
>
> streamToJoin
>         .leftJoin(lookupTable, Joiner(streamId),
> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
>         .to(targetTopicName, Produced.with(Serdes.String(),
> genericAvroSerde))
> val topology = builder.build()
>
> It is simple kind of lookup table to stream join. The Joiner
> implementation looks as follows
>
> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
> GenericRecord, GenericRecord> {
>     override fun apply(main: GenericRecord?, lookup: GenericRecord?):
> GenericRecord {
>         if (main == null) LOG.warn("for streamId: $streamId record from
> main is null")
>         if (lookup == null) LOG.warn("for streamId: $streamId record from
> lookup is null")
>
>         return GenericData.Record(MySampleData.schema)
>                 .apply {
>                     put(MySampleData::stringField.name,
> main?.get(MySampleData::stringField.name))
>                     put(MySampleData::booleanField.name,
> main?.get(MySampleData::booleanField.name))
>                     put(MySampleData::intField.name,
> lookup?.get(MySampleData::intField.name))
>                 }
>     }
> }
>
> The problem is that sometimes in not deterministic way Joiner’s apply()
> method gets null for lookup parameter, while in some cases the parameter is
> not null - as expected.
> The repo I referred above contains a test that is supposed to use that
> topology. It iterates 10 times building new instance of the topology each
> time and then it feeds two topics with sample data (10 records for each
> topic) expecting 1 to 1 join will be performed for each records pair.
> As seen in log output:
> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
> [tenantId=]  - Number of not properly joined per iteration (iteration
> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1,
> 9=0}. Total errors: 8
>
> Some of of the iteration produce no errors, while most of them does.
>
> Any help welcome. At this point I have no clue what may clause such
> behaviour.
> Best regards
> BK

Reply via email to