guozhangwang commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r769154123



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -317,7 +317,7 @@ public GroupAssignment assign(final Cluster metadata, final 
GroupSubscription gr
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
         boolean shutdownRequested = false;
-        boolean assignementErrorFound = false;
+        boolean assignmentErrorFound = false;

Review comment:
       Ah good find :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' 
number of partitions
-            final Map<TopicPartition, PartitionInfo> 
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(

Review comment:
       Why inline this function? Maybe we can still avoid 
`StreamsPartitionAssignor` to exclude the function length code if we extra 
enough LOC?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' 
number of partitions
-            final Map<TopicPartition, PartitionInfo> 
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> 
missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: 
{}. Please make sure all source topics " +
+                              "have been pre-created before starting the 
Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source 
topics.");
+                }

Review comment:
       I think that's covered in the stream-thread where we would poll much 
longer when all tasks have missing topics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to