cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r590519538



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);

Review comment:
       Good point! Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to