bbejeck commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r413860696



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor<K, V> 
topicExtractor,
             null,
             optimizableRepartitionNodeBuilder);
 
-        final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-        builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+        if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
       >Hmmm... I am wondering if just bumping the index would be sufficient 
and the optimizer would merge the node automatically?
   
   I hadn't thought of that, but it should work.  I initially had concerns for 
topology compatibility,  but I don't think that is the case since users can't 
create re-use a `KStream` node in joins that needs repartitioning.
   
   >I am also not sure about the code structure: so far, the DSL layer does not 
know much about optimizations (even if we "leak" a little bit into it, as we 
built up the StreamsGraphNode graph... We would push some optimization 
decisions into the DSL layer thus spreading out "optimization code"? On the 
other hand, just inserting one OptimizableRepartitionNode is much more 
efficient than inserting multiple and let the optimizer remove them later?
   
   Yeah, I agree the current approach is leaking too much optimization into the 
current code.  I think it will be better to just go ahead and create the 
topology "as is" and let the optimizer do its job.
   
   >I am also wondering, if we could do the same for other repartition topics?
   
   We probably should have a consistent approach.  How about I make the changes 
in this PR for the Join repartition topics (incrementing the index of the 
repartition node name) and do a follow-on PR to address the other repartition 
topics?
   
   >Last question: this method is also use for stream-table joins and thus, if 
one joins a stream with two tables, would this change be backward incompatible? 
Or would two stream-table joins fail with the same InvalidTopologyException?
   
   I believe two stream-table joins without this fix will fail with the same 
exception, but I'll add some tests to confirm.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to