[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408303#comment-17408303 ]
Adam Bellemare edited comment on KAFKA-13261 at 9/1/21, 6:05 PM: ----------------------------------------------------------------- Hi [~xnix]. We'll need some more information: Have you already verified that the repartitioned Stream A is correctly co-partitioned with Stream B? This would be your first step, as if you can consistently reproduce this, it could very well be an incorrect repartitioning. Assuming that the events are correctly repartitioned and keyed, you may be seeing the following: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP213SupportnonkeyjoininginKTable-JoinerPropagationofStaleData] Due to the distributed and independent-processing nature of Kafka Streams (tasks), it is possible that newer events are joined prior to older events. This is due to context switching between tasks at inopportune times, crashing instances, failures, etc. Stale events, from the perspective of the Left-Hand Side, will not be propagated, because they would otherwise overwrite newer data. This is one of the tradeoffs of this implementation. For example, your Stream B inputs may be: (1, [b], foo ) (2, [b], bar ) (1, [b], bar ) But depending on how Kafka streams loads the events into the KTable (eg: loads them all in in a single batch, writes to the KTable, and writes to the subscription topic), you will either get all 3 joined events out, OR, you will only get: (2, joinedResults[b], bar ) (1, joinedResults[b], bar ) (<--- this is the "final" state given the current inputs). You _should_ always see the final join result, and it should always be the same value each time you run the test. If this is not occurring, could you please post a simple test demonstrating the inputs? I think you could probably get away [with copying this test|https://github.com/a0x8o/kafka/blob/master/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java#L64] and modifying it to showcase your inputs. Finally, the reason I suspect its either the partitioner or the discarding of stale events is that with singular partitions, all data is necessarily both co-located and is guaranteed to be processed sequentially. In this case I would expect to always see every join result, in sequential order, without anything missing (as you so observed). was (Author: abellemare): Hi [~xnix]. We'll need some more information: Have you already verified that the repartitioned Stream A is correctly co-partitioned with Stream B? This would be your first step, as if you can consistently reproduce this, it could very well be an incorrect repartitioning. Assuming that the events are correctly repartitioned and keyed, you may be seeing the following: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP213SupportnonkeyjoininginKTable-JoinerPropagationofStaleData] Due to the distributed and independent-processing nature of Kafka Streams (tasks), it is possible that newer events are joined prior to older events. This is due to context switching between tasks at inopportune times, crashing instances, failures, etc. Stale events, from the perspective of the Left-Hand Side, will not be propagated, because they would otherwise overwrite newer data. This is one of the tradeoffs of this implementation. However, you _should_ always see the final join result, and it should always be the same value. If this is not occurring, could you please post a simple test demonstrating the inputs? I think you could probably get away [with copying this test|https://github.com/a0x8o/kafka/blob/master/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java#L64] and modifying it to showcase your inputs. Finally, the reason I suspect its either the partitioner or the discarding of stale events is that with singular partitions, all data is necessarily both co-located and is guaranteed to be processed sequentially. In this case I would expect to always see every join result, in sequential order, without anything missing (as you so observed). > KTable to KTable foreign key join loose events when using several partitions > ---------------------------------------------------------------------------- > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0, 2.7.1 > Reporter: Tomas Forsman > Priority: Major > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable<String, String> tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized<KeyA, EventA> aMaterialized(String name) { > Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() { > Repartitioned<DriverPeriod, DriverCosts> repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner<DriverPeriod, DriverCosts> > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> > joinMaterialized(String name) { > Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)