[ https://issues.apache.org/jira/browse/KAFKA-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Viamari updated KAFKA-9222: ----------------------------------- Priority: Minor (was: Major) > StreamPartitioner for internal repartition topics does not match defaults for > to() operation > -------------------------------------------------------------------------------------------- > > Key: KAFKA-9222 > URL: https://issues.apache.org/jira/browse/KAFKA-9222 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.1 > Reporter: Michael Viamari > Priority: Minor > > When a KStream has a Windowed key, different StreamPartitions are selected > depending on how the stream sink is generated. > When using `KStream#to()`, the topology uses a `StreamSinkNode`, which > chooses a `WindowedStreamPartitioner` when no partitioner is provided when > creating a `SinkNode` for the topology. > {code:java} > KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); > aggResult.toStream().to(aggStreamTopic) > {code} > When an internal repartition is created before a stateful operation, an > `OptimizableRepartitionNode` is used, which results in a `SinkNode` being > added to the topology. This node is created with a null partitioner, which > then would always use the Producer default partitioner. This becomes an issue > when attempting to join a windowed stream/ktable with a stream that was > mapped into a windowed key. > {code:java} > KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); > windowedAgg.toStream().to(aggStreamTopic); > KStream<> windowedStream = inputStream.map((k, v) -> { > Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp()); > Window minW = getMinWindow(w.values()); > return KeyValue.pair(new Windowed<>(k, minW), v); > }); > windowedStream.leftJoin(windowedAgg, ....); > {code} > The only work around I've found is to either use the default partitioner for > the `KStream#to()` operation, or to use `KStream.through()` for the > repartition operation. -- This message was sent by Atlassian Jira (v8.3.4#803005)