feyman2016 commented on a change in pull request #8832: URL: https://github.com/apache/kafka/pull/8832#discussion_r446507613
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -147,6 +147,71 @@ public String toString() { } } + /** + * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful + * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)} + * internally do a DFS search along with the graph. + * + TopicNode("t1") TopicNode("t2") TopicNode("t6") TopicNode("t7") + \ / \ / + TopicsInfoNode(source = (t1,t2), sink = (t3,t4)) TopicsInfoNode(source = (t6,t7), sink = (t4)) + / \ / + / \ / + / \ / + / \ / + / \ / + TopicNode("t3") TopicNode("t4") + \ + TopicsInfoNode(source = (t3), sink = ()) + + t3 = max(t1,t2) + t4 = max(max(t1,t2), max(t6,t7)) + */ + private static class TopicNode { + public final String topicName; + public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this + public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this + public Optional<Integer> numOfRepartitions; + TopicNode(final String topicName) { + this.topicName = topicName; + this.upStreams = new HashSet<>(); + this.downStreams = new HashSet<>(); + this.numOfRepartitions = Optional.empty(); + } + + public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.upStreams.add(new TopicsInfoNode(topicsInfo)); + } + + public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.downStreams.add(new TopicsInfoNode(topicsInfo)); + } + } + + // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate + // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization. Review comment: fixed ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -147,6 +147,71 @@ public String toString() { } } + /** + * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful + * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)} + * internally do a DFS search along with the graph. + * + TopicNode("t1") TopicNode("t2") TopicNode("t6") TopicNode("t7") + \ / \ / + TopicsInfoNode(source = (t1,t2), sink = (t3,t4)) TopicsInfoNode(source = (t6,t7), sink = (t4)) + / \ / + / \ / + / \ / + / \ / + / \ / + TopicNode("t3") TopicNode("t4") + \ + TopicsInfoNode(source = (t3), sink = ()) + + t3 = max(t1,t2) + t4 = max(max(t1,t2), max(t6,t7)) + */ + private static class TopicNode { + public final String topicName; + public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this + public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this + public Optional<Integer> numOfRepartitions; + TopicNode(final String topicName) { + this.topicName = topicName; + this.upStreams = new HashSet<>(); + this.downStreams = new HashSet<>(); + this.numOfRepartitions = Optional.empty(); + } + + public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.upStreams.add(new TopicsInfoNode(topicsInfo)); + } + + public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.downStreams.add(new TopicsInfoNode(topicsInfo)); + } + } + + // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate + // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization. + private static class TopicsInfoNode { + public TopicsInfo topicsInfo; + private Optional<Integer> numOfRepartitions; + TopicsInfoNode(final TopicsInfo topicsInfo) { + this.topicsInfo = topicsInfo; + this.numOfRepartitions = Optional.empty(); + } + + public void setNumOfRepartitions(final int numOfRepartitions) { Review comment: fixed ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -511,68 +576,86 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, /** * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata */ - private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata, + // visible for testing + public void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata, Review comment: sure ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ########## @@ -1695,6 +1698,60 @@ public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() { shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); } + @Test + public void shouldCorrectlySetRepartitionTopicMetadataNumberOfPartitions() { + // Test out a topology built with 3 levels of sub-topology as below: + // input-t1 input-t2 + // | | + // topicInfo0 topicInfo4 + // / \ / + // t1 t2 + // | | + // topicInfo1 topicInfo2 + // | | + // t1_1 t2_1 + // \ / + // topicsInfo3 + // t1, t2... are topics are the sourceTopics/sinkTopics of these sub-topologies, numberOfPartitions of a given + // topic should be the maximum of its upstream topics' numberOfPartitions. + // For example(use NoP instead of numberOfPartitions for simplicity): Review comment: updated ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org