bbejeck commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r413860696
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ########## @@ -989,16 +994,18 @@ private void to(final TopicNameExtractor<K, V> topicExtractor, null, optimizableRepartitionNodeBuilder); - final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); - builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode); + if (repartitionNode == null || !name.equals(repartitionName)) { Review comment: >Hmmm... I am wondering if just bumping the index would be sufficient and the optimizer would merge the node automatically? I hadn't thought of that, but it should work. I initially had concerns for topology compatibility, but I don't think that is the case since users can't create re-use a `KStream` node in joins that needs repartitioning. >I am also not sure about the code structure: so far, the DSL layer does not know much about optimizations (even if we "leak" a little bit into it, as we built up the StreamsGraphNode graph... We would push some optimization decisions into the DSL layer thus spreading out "optimization code"? On the other hand, just inserting one OptimizableRepartitionNode is much more efficient than inserting multiple and let the optimizer remove them later? Yeah, I agree the current approach is leaking too much optimization into the current code. I think it will be better to just go ahead and create the topology "as is" and let the optimizer do its job. >I am also wondering, if we could do the same for other repartition topics? We probably should have a consistent approach. How about I make the changes in this PR for the Join repartition topics (incrementing the index of the repartition node name) and do a follow-on PR to address the other repartition topics? >Last question: this method is also use for stream-table joins and thus, if one joins a stream with two tables, would this change be backward incompatible? Or would two stream-table joins fail with the same InvalidTopologyException? I believe two stream-table joins without this fix will fail with the same exception, but I'll add some tests to confirm. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org