[ 
https://issues.apache.org/jira/browse/KAFKA-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16979700#comment-16979700
 ] 

John Roesler commented on KAFKA-9222:
-------------------------------------

Thanks for the report!

It does sound like a bug in the internal processor graph construction logic. We 
should be able to forward the knowledge that the stream is windowed through the 
topology and use the right partitioner for the repartition topic. 

Besides the case you point out, it’s not clear to me that the default 
partitioner would even partition windowed data correctly, so there might be 
other implications on correctness as well. 

> StreamPartitioner for internal repartition topics does not match defaults for 
> to() operation
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9222
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9222
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1
>            Reporter: Michael Viamari
>            Priority: Minor
>
> When a KStream has a Windowed key, different StreamPartitions are selected 
> depending on how the stream sink is generated.
> When using `KStream#to()`, the topology uses a `StreamSinkNode`, which 
> chooses a `WindowedStreamPartitioner` when no partitioner is provided when 
> creating a `SinkNode` for the topology.
> {code:java}
> KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); 
> aggResult.toStream().to(aggStreamTopic)
> {code}
> When an internal repartition is created before a stateful operation, an 
> `OptimizableRepartitionNode` is used, which results in a `SinkNode` being 
> added to the topology. This node is created with a null partitioner, which 
> then would always use the Producer default partitioner. This becomes an issue 
> when attempting to join a windowed stream/ktable with a stream that was 
> mapped into a windowed key.
> {code:java}
> KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); 
> windowedAgg.toStream().to(aggStreamTopic);
> KStream<> windowedStream = inputStream.map((k, v) -> {
>     Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp());
>     Window minW = getMinWindow(w.values());
>     return KeyValue.pair(new Windowed<>(k, minW), v);
> });
> windowedStream.leftJoin(windowedAgg, ....);
> {code}
> The only work around I've found is to either use the default partitioner for 
> the `KStream#to()` operation, or to use `KStream.through()` for the 
> repartition operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to