cadonna commented on a change in pull request #9848:
URL: https://github.com/apache/kafka/pull/9848#discussion_r558419237



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -471,135 +470,22 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
         return versionProbing;
     }
 
-    /**
-     * @return a map of repartition topics and their metadata
-     */
-    private Map<String, InternalTopicConfig> 
computeRepartitionTopicMetadata(final Map<Integer, TopicsInfo> topicGroups,
-                                                                             
final Cluster metadata) {
-        final Map<String, InternalTopicConfig> repartitionTopicMetadata = new 
HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            for (final String topic : topicsInfo.sourceTopics) {
-                if (!topicsInfo.repartitionSourceTopics.containsKey(topic) && 
!metadata.topics().contains(topic)) {
-                    log.error("Source topic {} is missing/unknown during 
rebalance, please make sure all source topics " +
-                                  "have been pre-created before starting the 
Streams application. Returning error {}",
-                                  topic, 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    throw new MissingSourceTopicException("Missing source 
topic during assignment.");
-                }
-            }
-            for (final InternalTopicConfig topic : 
topicsInfo.repartitionSourceTopics.values()) {
-                repartitionTopicMetadata.put(topic.name(), topic);
-            }
-        }
-        return repartitionTopicMetadata;
-    }
-
     /**
      * Computes and assembles all repartition topic metadata then creates the 
topics if necessary.
      *
      * @return map from repartition topic to its partition info
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Map<Integer, TopicsInfo> topicGroups,
-                                                                           
final Cluster metadata) {
-        final Map<String, InternalTopicConfig> repartitionTopicMetadata = 
computeRepartitionTopicMetadata(topicGroups, metadata);
-
-        
setRepartitionTopicMetadataNumberOfPartitions(repartitionTopicMetadata, 
topicGroups, metadata);
-
-        // ensure the co-partitioning topics within the group have the same 
number of partitions,
-        // and enforce the number of partitions for those repartition topics 
to be the same if they
-        // are co-partitioned as well.
-        ensureCopartitioning(taskManager.builder().copartitionGroups(), 
repartitionTopicMetadata, metadata);
-
-        // make sure the repartition source topics exist with the right number 
of partitions,
-        // create these topics if necessary
-        internalTopicManager.makeReady(repartitionTopicMetadata);
-
-        // augment the metadata with the newly computed number of partitions 
for all the
-        // repartition source topics
-        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions 
= new HashMap<>();
-        for (final Map.Entry<String, InternalTopicConfig> entry : 
repartitionTopicMetadata.entrySet()) {
-            final String topic = entry.getKey();
-            final int numPartitions = 
entry.getValue().numberOfPartitions().orElse(-1);
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                allRepartitionTopicPartitions.put(
-                    new TopicPartition(topic, partition),
-                    new PartitionInfo(topic, partition, null, new Node[0], new 
Node[0])
-                );
-            }
-        }
-        return allRepartitionTopicPartitions;
-    }
-
-    /**
-     * Computes the number of partitions and sets it for each repartition 
topic in repartitionTopicMetadata
-     */
-    private void setRepartitionTopicMetadataNumberOfPartitions(final 
Map<String, InternalTopicConfig> repartitionTopicMetadata,
-                                                               final 
Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster 
metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-            boolean progressMadeThisIteration = false;  // avoid infinitely 
looping without making any progress on unknown repartitions
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String repartitionSourceTopic : 
topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = 
repartitionTopicMetadata.get(repartitionSourceTopic)
-                                                                     
.numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this 
repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : 
topicGroups.values()) {
-                            final Set<String> otherSinkTopics = 
otherTopicsInfo.sinkTopics;
-
-                            if 
(otherSinkTopics.contains(repartitionSourceTopic)) {
-                                // if this topic is one of the sink topics of 
this topology,
-                                // use the maximum of all its source topic 
partitions as the number of partitions
-                                for (final String upstreamSourceTopic : 
otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is 
another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if 
(repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
-                                        if 
(repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent())
 {
-                                            numPartitionsCandidate =
-                                                
repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = 
metadata.partitionCountForTopic(upstreamSourceTopic);
-                                        if (count == null) {
-                                            throw new TaskAssignmentException(
-                                                "No partition count found for 
source topic "
-                                                    + upstreamSourceTopic
-                                                    + ", but it should have 
been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || 
numPartitionsCandidate > numPartitions) {
-                                            numPartitions = 
numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                            log.trace("Unable to determine number of 
partitions for {}, another iteration is needed",
-                                      repartitionSourceTopic);
-                        } else {
-                            
repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
-                            progressMadeThisIteration = true;
-                        }
-                    }
-                }
-            }
-            if (!progressMadeThisIteration && numPartitionsNeeded) {
-                throw new TaskAssignmentException("Failed to compute number of 
partitions for all repartition topics");
-            }
-        } while (numPartitionsNeeded);
+    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Cluster metadata) {
+
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            taskManager.builder(),
+            internalTopicManager,
+            copartitionedTopicsEnforcer,
+            metadata,
+            logPrefix
+        );
+        repartitionTopics.setup();

Review comment:
       That is a valid question. When we introduce the explicit initialization 
and the manual and automatic configs, we need to distinguish between setting up 
the internal topics and verifying the internal topics. Hence, there will be two 
methods on the `RepartitionTopic` class. We will create the object and then 
according to the configs we will either just verify or verify and setup the 
repartition topics. We could accomplish the same with a flag in the constructor 
but I thought the code might be easier to understand with two methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to