[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415337#comment-17415337
 ] 

Victoria Xia commented on KAFKA-13261:
--------------------------------------

Hey [~vvcephei] [~abellemare] [~guozhang] I had a look at the code and it seems 
to support Adam's theory that the custom partitioners from the repartition() 
step aren't taken into account by the foreign key join. In particular, both the 
subscription sink topic and the response sink topic are created without 
partitioners specified in the StreamSinkNode:

[https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1051]
 
[https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1122]

IIUC, this means the default partitioner is used for both topics despite the 
custom partitioners on the source tables, which explains the missing join 
results.

One thing I don't understand: even if we fix this bug by propagating the 
partitioner information from the repartition() step to the foreign key join, 
wouldn't we still have an analogous bug if either of the topics for the source 
tables had custom partitioning logic created from outside Streams (i.e., 
without a repartition() step in the Streams topology)? In this case, Streams 
has no way of determining the partitioning of the source tables, which means we 
need an update to the interface for foreign key joins so that users can specify 
a partitioner to use in order to ensure copartitioning of the subscription and 
response topics with the relevant tables. Is this reasoning sound?

If so, does it make sense to add logic into Streams to propagate information 
about the partitioner from the repartition() step to the foreign key join, or 
would it be better to require users to use the new interface to pass the same 
partitioner from the repartition() step(s) to the foreign key join as well? The 
latter seems more consistent with how copartitioning for joins is typically the 
user's responsibility, and also avoids the need to update Streams with logic 
for tracking partitioners throughout the topology.

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Assignee: Victoria Xia
>            Priority: Major
>         Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  
> stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
> Repartitioned.as("driverperiod");
>     return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> 
> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> 
> table = Materialized.as(name);
>     return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to