abbccdda commented on a change in pull request #8832: URL: https://github.com/apache/kafka/pull/8832#discussion_r445001673
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -511,68 +576,86 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, /** * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata */ - private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata, + // visible for testing + public void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata, Review comment: package level access should be fine. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -147,6 +147,71 @@ public String toString() { } } + /** + * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful + * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)} + * internally do a DFS search along with the graph. + * + TopicNode("t1") TopicNode("t2") TopicNode("t6") TopicNode("t7") + \ / \ / + TopicsInfoNode(source = (t1,t2), sink = (t3,t4)) TopicsInfoNode(source = (t6,t7), sink = (t4)) + / \ / + / \ / + / \ / + / \ / + / \ / + TopicNode("t3") TopicNode("t4") + \ + TopicsInfoNode(source = (t3), sink = ()) + + t3 = max(t1,t2) + t4 = max(max(t1,t2), max(t6,t7)) + */ + private static class TopicNode { + public final String topicName; + public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this + public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this + public Optional<Integer> numOfRepartitions; + TopicNode(final String topicName) { + this.topicName = topicName; + this.upStreams = new HashSet<>(); + this.downStreams = new HashSet<>(); + this.numOfRepartitions = Optional.empty(); + } + + public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.upStreams.add(new TopicsInfoNode(topicsInfo)); + } + + public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.downStreams.add(new TopicsInfoNode(topicsInfo)); + } + } + + // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate + // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization. + private static class TopicsInfoNode { + public TopicsInfo topicsInfo; + private Optional<Integer> numOfRepartitions; + TopicsInfoNode(final TopicsInfo topicsInfo) { + this.topicsInfo = topicsInfo; + this.numOfRepartitions = Optional.empty(); + } + + public void setNumOfRepartitions(final int numOfRepartitions) { Review comment: nit: we could require non-negative value for `numOfRepartitions` ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ########## @@ -1695,6 +1698,60 @@ public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() { shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); } + @Test + public void shouldCorrectlySetRepartitionTopicMetadataNumberOfPartitions() { + // Test out a topology built with 3 levels of sub-topology as below: + // input-t1 input-t2 + // | | + // topicInfo0 topicInfo4 + // / \ / + // t1 t2 + // | | + // topicInfo1 topicInfo2 + // | | + // t1_1 t2_1 + // \ / + // topicsInfo3 + // t1, t2... are topics are the sourceTopics/sinkTopics of these sub-topologies, numberOfPartitions of a given + // topic should be the maximum of its upstream topics' numberOfPartitions. + // For example(use NoP instead of numberOfPartitions for simplicity): Review comment: s/ `NoP instead of numberOfPartitions for simplicity` / `NoP = numberOfPartitions` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -147,6 +147,71 @@ public String toString() { } } + /** + * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful + * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)} + * internally do a DFS search along with the graph. + * + TopicNode("t1") TopicNode("t2") TopicNode("t6") TopicNode("t7") + \ / \ / + TopicsInfoNode(source = (t1,t2), sink = (t3,t4)) TopicsInfoNode(source = (t6,t7), sink = (t4)) + / \ / + / \ / + / \ / + / \ / + / \ / + TopicNode("t3") TopicNode("t4") + \ + TopicsInfoNode(source = (t3), sink = ()) + + t3 = max(t1,t2) + t4 = max(max(t1,t2), max(t6,t7)) + */ + private static class TopicNode { + public final String topicName; + public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this + public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this + public Optional<Integer> numOfRepartitions; + TopicNode(final String topicName) { + this.topicName = topicName; + this.upStreams = new HashSet<>(); + this.downStreams = new HashSet<>(); + this.numOfRepartitions = Optional.empty(); + } + + public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.upStreams.add(new TopicsInfoNode(topicsInfo)); + } + + public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) { + this.downStreams.add(new TopicsInfoNode(topicsInfo)); + } + } + + // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate + // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization. Review comment: `...build a graph to calculate partition number of repartition topic, and numOfRepartitions of underlying TopicsInfo is used for memoization.` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -147,6 +147,71 @@ public String toString() { } } + /** + * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful Review comment: Like we discussed offline, the downstream is actually not necessary. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org