[ 
https://issues.apache.org/jira/browse/KAFKA-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Viamari updated KAFKA-9222:
-----------------------------------
    Priority: Minor  (was: Major)

> StreamPartitioner for internal repartition topics does not match defaults for 
> to() operation
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9222
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9222
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1
>            Reporter: Michael Viamari
>            Priority: Minor
>
> When a KStream has a Windowed key, different StreamPartitions are selected 
> depending on how the stream sink is generated.
> When using `KStream#to()`, the topology uses a `StreamSinkNode`, which 
> chooses a `WindowedStreamPartitioner` when no partitioner is provided when 
> creating a `SinkNode` for the topology.
> {code:java}
> KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); 
> aggResult.toStream().to(aggStreamTopic)
> {code}
> When an internal repartition is created before a stateful operation, an 
> `OptimizableRepartitionNode` is used, which results in a `SinkNode` being 
> added to the topology. This node is created with a null partitioner, which 
> then would always use the Producer default partitioner. This becomes an issue 
> when attempting to join a windowed stream/ktable with a stream that was 
> mapped into a windowed key.
> {code:java}
> KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); 
> windowedAgg.toStream().to(aggStreamTopic);
> KStream<> windowedStream = inputStream.map((k, v) -> {
>     Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp());
>     Window minW = getMinWindow(w.values());
>     return KeyValue.pair(new Windowed<>(k, minW), v);
> });
> windowedStream.leftJoin(windowedAgg, ....);
> {code}
> The only work around I've found is to either use the default partitioner for 
> the `KStream#to()` operation, or to use `KStream.through()` for the 
> repartition operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to