[ 
https://issues.apache.org/jira/browse/KAFKA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9302:
-----------------------------------
    Description: 
For KTables, Kafka Streams assume that both input tables are partitions by 
their key. This is a natural choice that works well for primary key table-table 
joins. However, for foreign-key joins (as introduced in 2.4 via KIP-213 / 
KAFKA-3705), this assumption is actually quite strict.

If we relax this requirement, we would need to consider the following:
 * For primary key joins, the assumption is that both input topics use the same 
number of partitions and the same partitioning strategy based on the primary 
key. Kafka Streams can only check at runtime if the number of partitions match 
and optimistically assume that the partitioning strategy is correct. This 
allows for a distributed join computations with one task per input topic 
partition-pair.

If we relax this requirement, we would need to allow users to pass in a custom 
partitioner and would need to repartitions one table before we can execute the 
join. Hence, while this feature would make Kafka Streams more flexible, it's an 
expensive operation. Note, that user can do the repartitioning "upstream" 
manually if required. However, if we consider the next bullet point, we might 
want to have built-in support for this case to lift the burden from the user.

 
 * For foreign key join, the same assumption as for primary key joins is made. 
This implies that we always need to repartition data both ways to send 
subscriptions request and subscription response to the correct tasks. At the 
same time, if the input KTables would use a custom primary key partitioning 
strategy, the join would fail. Hence, there are two potential improvements:

 ## Allow users to pass in a custom (primary key based) partitioner, that can 
be used to write into the subscription request and subscription response 
repartition topics correctly (if the "receiver side" table is partitioned by 
custom strategy). This would be a straightforward extension of the existing 
foreign key join.
 ## Assuming that the right hand side is using the same number of partitions as 
the left hand side, and assuming that the left hand side is not partitioned by 
its primary key, but by the join attribute (ie, FK key extracted from the 
value)*, no repartitioning would be required (ie, no subscription requests and 
responses), but the join could be executed in a single task per input topic 
partition-pair.



*) Note, that the assumption that the right hand side would not be partitioned 
by it's primary key but by the primary key of the left hand side does not make 
too much sense. While, if this would be true, would also allow for a 
co-partitioned execution without the need to repartition the data, the right 
hand side does not know the primary key of the left hand side and thus, such a 
partitioning cannot be achieved.

  was:
In the 2.4 release, we introduces foreign-key joins via KIP-213 (KAFKA-3705). 
However, the API does not allow to work with a custom partitioning strategy, 
ie, it always assumes that the both input tables use the default partitioner.

We should allow to pass in a custom partitioner for each side that can be used 
when writing in the repartitions topics.


> Allow custom partitioning in table-table joins
> ----------------------------------------------
>
>                 Key: KAFKA-9302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9302
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>              Labels: needs-kip
>
> For KTables, Kafka Streams assume that both input tables are partitions by 
> their key. This is a natural choice that works well for primary key 
> table-table joins. However, for foreign-key joins (as introduced in 2.4 via 
> KIP-213 / KAFKA-3705), this assumption is actually quite strict.
> If we relax this requirement, we would need to consider the following:
>  * For primary key joins, the assumption is that both input topics use the 
> same number of partitions and the same partitioning strategy based on the 
> primary key. Kafka Streams can only check at runtime if the number of 
> partitions match and optimistically assume that the partitioning strategy is 
> correct. This allows for a distributed join computations with one task per 
> input topic partition-pair.
> If we relax this requirement, we would need to allow users to pass in a 
> custom partitioner and would need to repartitions one table before we can 
> execute the join. Hence, while this feature would make Kafka Streams more 
> flexible, it's an expensive operation. Note, that user can do the 
> repartitioning "upstream" manually if required. However, if we consider the 
> next bullet point, we might want to have built-in support for this case to 
> lift the burden from the user.
>  
>  * For foreign key join, the same assumption as for primary key joins is 
> made. This implies that we always need to repartition data both ways to send 
> subscriptions request and subscription response to the correct tasks. At the 
> same time, if the input KTables would use a custom primary key partitioning 
> strategy, the join would fail. Hence, there are two potential improvements:
>  ## Allow users to pass in a custom (primary key based) partitioner, that can 
> be used to write into the subscription request and subscription response 
> repartition topics correctly (if the "receiver side" table is partitioned by 
> custom strategy). This would be a straightforward extension of the existing 
> foreign key join.
>  ## Assuming that the right hand side is using the same number of partitions 
> as the left hand side, and assuming that the left hand side is not 
> partitioned by its primary key, but by the join attribute (ie, FK key 
> extracted from the value)*, no repartitioning would be required (ie, no 
> subscription requests and responses), but the join could be executed in a 
> single task per input topic partition-pair.
> *) Note, that the assumption that the right hand side would not be 
> partitioned by it's primary key but by the primary key of the left hand side 
> does not make too much sense. While, if this would be true, would also allow 
> for a co-partitioned execution without the need to repartition the data, the 
> right hand side does not know the primary key of the left hand side and thus, 
> such a partitioning cannot be achieved.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to