feyman2016 commented on a change in pull request #8832: URL: https://github.com/apache/kafka/pull/8832#discussion_r443097860
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -514,65 +554,81 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata, final Map<Integer, TopicsInfo> topicGroups, final Cluster metadata) { - boolean numPartitionsNeeded; - do { - numPartitionsNeeded = false; - - for (final TopicsInfo topicsInfo : topicGroups.values()) { - for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { - final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName) - .numberOfPartitions(); - Integer numPartitions = null; - - if (!maybeNumPartitions.isPresent()) { - // try set the number of partitions for this repartition topic if it is not set yet - for (final TopicsInfo otherTopicsInfo : topicGroups.values()) { - final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; - - if (otherSinkTopics.contains(topicName)) { - // if this topic is one of the sink topics of this topology, - // use the maximum of all its source topic partitions as the number of partitions - for (final String sourceTopicName : otherTopicsInfo.sourceTopics) { - Integer numPartitionsCandidate = null; - // It is possible the sourceTopic is another internal topic, i.e, - // map().join().join(map()) - if (repartitionTopicMetadata.containsKey(sourceTopicName)) { - if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { - numPartitionsCandidate = - repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); - } - } else { - final Integer count = metadata.partitionCountForTopic(sourceTopicName); - if (count == null) { - throw new IllegalStateException( - "No partition count found for source topic " - + sourceTopicName - + ", but it should have been." - ); - } - numPartitionsCandidate = count; - } - - if (numPartitionsCandidate != null) { - if (numPartitions == null || numPartitionsCandidate > numPartitions) { - numPartitions = numPartitionsCandidate; - } - } - } - } - } - - // if we still have not found the right number of partitions, - // another iteration is needed - if (numPartitions == null) { - numPartitionsNeeded = true; - } else { - repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions); - } - } + final Set<String> allRepartitionSourceTopics = new HashSet<>(); + final Map<String, TopicNode> builtTopicNodes = new HashMap<>(); + // 1. Build a graph containing the TopicsInfo and TopicsNode + for (final TopicsInfo topicsInfo : topicGroups.values()) { + allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet()); + for (final String sourceTopic : topicsInfo.sourceTopics) { + builtTopicNodes.computeIfAbsent(sourceTopic, topic -> new TopicNode(topic)); + builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo); + } + + for (final String sinkTopic : topicsInfo.sinkTopics) { + builtTopicNodes.computeIfAbsent(sinkTopic, topic -> new TopicNode(topic)); + builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo); + } + } + + // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics Review comment: fixed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org