The non-deterministic behavior you're seeing might be the result of a timing issue. In other words, in some cases your KTable is fully populated by the time data in "streamToJoin" is trying to find a match in "lookupTable" and in other cases it isn't. If you haven't already, you might want to take a look at using a GlobalKTable to see if that will work for your use-case. On startup, I believe Kafka Streams will wait until the GlobalKTable has fully consumed the topic before data starts flowing. There are downsides to GlobalKTable's (check the documentation), but if this is just a lookup table where the data is fairly static then it might make sense.
Alex On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <bartlomiej.k...@gmail.com> wrote: > Hi All, > Since some time I’m involved in development of application that > incorporates Kafka Streams API, I’m facing the problem with joining two > Kafka topics. The problem is illustrated in simple test that ws prepared > based on our production code. It is available here: > https://bitbucket.org/b_a_r_t_k/streams-join-problem/ < > https://bitbucket.org/b_a_r_t_k/streams-join-problem/> > As seen in the class JoinStreamBuilder: > > val builder = StreamsBuilder() > > val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId" > > val streamToJoin = builder.stream(mainTopicName, > Consumed.with(Serdes.String(), genericAvroSerde)) > .selectKey(MainKeySelector()) > > val lookupTable = builder.stream(lookupTableTopicName, > Consumed.with(Serdes.String(), genericAvroSerde)) > .selectKey(LookupKeySelector()) > .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde)) > .reduce({ _, new -> new }, > Materialized.`as`<String, GenericRecord, > KeyValueStore<Bytes, > ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde)) > > streamToJoin > .leftJoin(lookupTable, Joiner(streamId), > Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde)) > .to(targetTopicName, Produced.with(Serdes.String(), > genericAvroSerde)) > val topology = builder.build() > > It is simple kind of lookup table to stream join. The Joiner > implementation looks as follows > > class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord, > GenericRecord, GenericRecord> { > override fun apply(main: GenericRecord?, lookup: GenericRecord?): > GenericRecord { > if (main == null) LOG.warn("for streamId: $streamId record from > main is null") > if (lookup == null) LOG.warn("for streamId: $streamId record from > lookup is null") > > return GenericData.Record(MySampleData.schema) > .apply { > put(MySampleData::stringField.name, > main?.get(MySampleData::stringField.name)) > put(MySampleData::booleanField.name, > main?.get(MySampleData::booleanField.name)) > put(MySampleData::intField.name, > lookup?.get(MySampleData::intField.name)) > } > } > } > > The problem is that sometimes in not deterministic way Joiner’s apply() > method gets null for lookup parameter, while in some cases the parameter is > not null - as expected. > The repo I referred above contains a test that is supposed to use that > topology. It iterates 10 times building new instance of the topology each > time and then it feeds two topics with sample data (10 records for each > topic) expecting 1 to 1 join will be performed for each records pair. > As seen in log output: > 2019-09-08 13:49:09,634 [main] INFO com.example.demo.JoinStreamTest > [tenantId=] - Number of not properly joined per iteration (iteration > number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1, > 9=0}. Total errors: 8 > > Some of of the iteration produce no errors, while most of them does. > > Any help welcome. At this point I have no clue what may clause such > behaviour. > Best regards > BK