mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r413380852



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor<K, V> 
topicExtractor,
             null,
             optimizableRepartitionNodeBuilder);
 
-        final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-        builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+        if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
       Hmmm... I am wondering if just bumping the index would be sufficient and 
the optimizer would merge the node automatically?
   
   I am also not sure about the code structure: so far, the DSL layer does not 
know much about optimizations (even if we "leak" a little bit into it, as we 
built up the `StreamsGraphNode` graph... We would push some optimization 
decisions into the DSL layer thus spreading out "optimization code"? On the 
other hand, just inserting one `OptimizableRepartitionNode` is much more 
efficient than inserting multiple and let the optimizer remove them later?
   
   I am also wondering, if we could do the same for other repartition topics? 
   
   Last question: this method is also use for stream-table joins and thus, if 
one joins a stream with two tables, would this change be backward incompatible? 
Or would two stream-table joins fail with the same `InvalidTopologyException`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to