abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r439565299



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -514,65 +554,81 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
     private void setRepartitionTopicMetadataNumberOfPartitions(final 
Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final 
Map<Integer, TopicsInfo> topicGroups,
                                                                final Cluster 
metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = 
repartitionTopicMetadata.get(topicName)
-                                                                     
.numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this 
repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : 
topicGroups.values()) {
-                            final Set<String> otherSinkTopics = 
otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of 
this topology,
-                                // use the maximum of all its source topic 
partitions as the number of partitions
-                                for (final String sourceTopicName : 
otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is 
another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if 
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if 
(repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent())
 {
-                                            numPartitionsCandidate =
-                                                
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = 
metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for 
source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have 
been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || 
numPartitionsCandidate > numPartitions) {
-                                            numPartitions = 
numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of 
partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            
repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1. Build a graph containing the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            
allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                builtTopicNodes.computeIfAbsent(sourceTopic, topic -> new 
TopicNode(topic));
+                
builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                builtTopicNodes.computeIfAbsent(sinkTopic, topic -> new 
TopicNode(topic));
+                
builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+            }
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all 
repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, 
metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, 
InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> 
builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> 
topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = 
repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream 
TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;

Review comment:
       We could just use int.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with 
TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method 
setRepartitionTopicMetadataNumberOfPartitions
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    
TopicNode("t6")             TopicNode("t7")
+                \           /                                                  
          \                           /
+                  TopicsInfo(source = (t1,t2), sink = (t3,t4))                 
          TopicsInfo(source = (t6,t7), sink = (t4))
+                                /           \                                  
                                      /
+                             /                 \                               
                           /
+                          /                        \                           
                /
+                      /                                \                       
    /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfo(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfo> upStreams; // upStream TopicsInfo's 
sinkTopics contains this
+        public final Set<TopicsInfo> downStreams; // downStreams TopicsInfo's 
sourceTopics contains this

Review comment:
       Are you sure? I don't see this struct being used for read anywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to