cadonna commented on a change in pull request #9848: URL: https://github.com/apache/kafka/pull/9848#discussion_r558419237
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -471,135 +470,22 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, return versionProbing; } - /** - * @return a map of repartition topics and their metadata - */ - private Map<String, InternalTopicConfig> computeRepartitionTopicMetadata(final Map<Integer, TopicsInfo> topicGroups, - final Cluster metadata) { - final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>(); - for (final TopicsInfo topicsInfo : topicGroups.values()) { - for (final String topic : topicsInfo.sourceTopics) { - if (!topicsInfo.repartitionSourceTopics.containsKey(topic) && !metadata.topics().contains(topic)) { - log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics " + - "have been pre-created before starting the Streams application. Returning error {}", - topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); - throw new MissingSourceTopicException("Missing source topic during assignment."); - } - } - for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values()) { - repartitionTopicMetadata.put(topic.name(), topic); - } - } - return repartitionTopicMetadata; - } - /** * Computes and assembles all repartition topic metadata then creates the topics if necessary. * * @return map from repartition topic to its partition info */ - private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Map<Integer, TopicsInfo> topicGroups, - final Cluster metadata) { - final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicMetadata(topicGroups, metadata); - - setRepartitionTopicMetadataNumberOfPartitions(repartitionTopicMetadata, topicGroups, metadata); - - // ensure the co-partitioning topics within the group have the same number of partitions, - // and enforce the number of partitions for those repartition topics to be the same if they - // are co-partitioned as well. - ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata); - - // make sure the repartition source topics exist with the right number of partitions, - // create these topics if necessary - internalTopicManager.makeReady(repartitionTopicMetadata); - - // augment the metadata with the newly computed number of partitions for all the - // repartition source topics - final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>(); - for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) { - final String topic = entry.getKey(); - final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1); - - for (int partition = 0; partition < numPartitions; partition++) { - allRepartitionTopicPartitions.put( - new TopicPartition(topic, partition), - new PartitionInfo(topic, partition, null, new Node[0], new Node[0]) - ); - } - } - return allRepartitionTopicPartitions; - } - - /** - * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata - */ - private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata, - final Map<Integer, TopicsInfo> topicGroups, - final Cluster metadata) { - boolean numPartitionsNeeded; - do { - numPartitionsNeeded = false; - boolean progressMadeThisIteration = false; // avoid infinitely looping without making any progress on unknown repartitions - - for (final TopicsInfo topicsInfo : topicGroups.values()) { - for (final String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) { - final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(repartitionSourceTopic) - .numberOfPartitions(); - Integer numPartitions = null; - - if (!maybeNumPartitions.isPresent()) { - // try set the number of partitions for this repartition topic if it is not set yet - for (final TopicsInfo otherTopicsInfo : topicGroups.values()) { - final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; - - if (otherSinkTopics.contains(repartitionSourceTopic)) { - // if this topic is one of the sink topics of this topology, - // use the maximum of all its source topic partitions as the number of partitions - for (final String upstreamSourceTopic : otherTopicsInfo.sourceTopics) { - Integer numPartitionsCandidate = null; - // It is possible the sourceTopic is another internal topic, i.e, - // map().join().join(map()) - if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) { - if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) { - numPartitionsCandidate = - repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get(); - } - } else { - final Integer count = metadata.partitionCountForTopic(upstreamSourceTopic); - if (count == null) { - throw new TaskAssignmentException( - "No partition count found for source topic " - + upstreamSourceTopic - + ", but it should have been." - ); - } - numPartitionsCandidate = count; - } - - if (numPartitionsCandidate != null) { - if (numPartitions == null || numPartitionsCandidate > numPartitions) { - numPartitions = numPartitionsCandidate; - } - } - } - } - } - - if (numPartitions == null) { - numPartitionsNeeded = true; - log.trace("Unable to determine number of partitions for {}, another iteration is needed", - repartitionSourceTopic); - } else { - repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions); - progressMadeThisIteration = true; - } - } - } - } - if (!progressMadeThisIteration && numPartitionsNeeded) { - throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics"); - } - } while (numPartitionsNeeded); + private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Cluster metadata) { + + final RepartitionTopics repartitionTopics = new RepartitionTopics( + taskManager.builder(), + internalTopicManager, + copartitionedTopicsEnforcer, + metadata, + logPrefix + ); + repartitionTopics.setup(); Review comment: That is a valid question. When we introduce the explicit initialization and the manual and automatic configs, we need to distinguish between setting up the internal topics and verifying the internal topics. Hence, there will be two methods on the `RepartitionTopic` class. We will create the object and then according to the configs we will either just verify or verify and setup the repartition topics. We could accomplish the same with a flag in the constructor but I thought the code might be easier to understand with two methods. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org