[
https://issues.apache.org/jira/browse/KAFKA-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Viamari updated KAFKA-9222:
-----------------------------------
Priority: Minor (was: Major)
> StreamPartitioner for internal repartition topics does not match defaults for
> to() operation
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-9222
> URL: https://issues.apache.org/jira/browse/KAFKA-9222
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.3.1
> Reporter: Michael Viamari
> Priority: Minor
>
> When a KStream has a Windowed key, different StreamPartitions are selected
> depending on how the stream sink is generated.
> When using `KStream#to()`, the topology uses a `StreamSinkNode`, which
> chooses a `WindowedStreamPartitioner` when no partitioner is provided when
> creating a `SinkNode` for the topology.
> {code:java}
> KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...);
> aggResult.toStream().to(aggStreamTopic)
> {code}
> When an internal repartition is created before a stateful operation, an
> `OptimizableRepartitionNode` is used, which results in a `SinkNode` being
> added to the topology. This node is created with a null partitioner, which
> then would always use the Producer default partitioner. This becomes an issue
> when attempting to join a windowed stream/ktable with a stream that was
> mapped into a windowed key.
> {code:java}
> KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...);
> windowedAgg.toStream().to(aggStreamTopic);
> KStream<> windowedStream = inputStream.map((k, v) -> {
> Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp());
> Window minW = getMinWindow(w.values());
> return KeyValue.pair(new Windowed<>(k, minW), v);
> });
> windowedStream.leftJoin(windowedAgg, ....);
> {code}
> The only work around I've found is to either use the default partitioner for
> the `KStream#to()` operation, or to use `KStream.through()` for the
> repartition operation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)