rodesai commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r588053733



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", 
topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> 
futuresForTopic,
+                                     final Map<String, InternalTopicConfig> 
topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> 
validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not 
contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not 
available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed 
out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic 
validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic 
%s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);

Review comment:
       why are we using RETRY_BACKOFF_MS_CONFIG here? this is just a sleep to 
avoid a tight loop in checking whether the futures are done right? In that case 
we wouldn't actually be issuing a new request, so we can probably just use a 
constant set to some small value (e.g. a few hundred ms).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());

Review comment:
       why not just log this stuff at info level? we're not on a performance 
path (this gets executed once every rebalance right?), and it can always come 
in handy when you're debugging. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);

Review comment:
       should we move this into the above loop so that we timeout if the 
futures never complete?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -451,6 +465,550 @@ public void 
shouldExhaustRetriesOnMarkedForDeletionTopic() {
         );
     }
 
+    @Test
+    public void shouldValidateSuccessfully() {
+        mockAdminClient.addTopic(
+            false,
+            topic1,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, 
cluster, Collections.emptyList())),
+            null
+        );
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic1, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final ValidationResult validationResult = 
internalTopicManager.validate(Collections.singletonMap(topic1, 
internalTopicConfig));
+
+        assertThat(validationResult.missingTopics, empty());
+        assertThat(validationResult.misconfigurationsForTopics, anEmptyMap());
+    }
+
+    @Test
+    public void shouldValidateSuccessfullyWithEmptyInternalTopics() {
+        mockAdminClient.addTopic(
+            false,
+            topic1,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, 
cluster, Collections.emptyList())),
+            null
+        );
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic1, Collections.emptyMap());

Review comment:
       this doesn't seem to be used

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", 
topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> 
futuresForTopic,
+                                     final Map<String, InternalTopicConfig> 
topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> 
validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not 
contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not 
available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed 
out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic 
validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic 
%s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> 
topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate 
internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} 
milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult 
validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription 
topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count 
is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + 
requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + 
actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, 
topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, 
topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final 
ValidationResult validationResult,
+                                                              final 
InternalTopicConfig topicConfig,
+                                                              final Config 
brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = 
brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " 
should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final 
ValidationResult validationResult,
+                                                            final 
InternalTopicConfig topicConfig,
+                                                            final Config 
brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = 
getBrokerSideConfigValue(brokerSideTopicConfig, 
TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, 
TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =
+                topicConfig.getProperties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+            final long streamsSideRetentionMs = 
Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG));
+            if (brokerSideRetentionMs < streamsSideRetentionMs) {
+                addMisconfiguration(
+                    validationResult,
+                    topicName,
+                    "Retention time of existing internal topic " + topicName + 
" is " + brokerSideRetentionMs +
+                    " but should be " + streamsSideRetentionMs + " or larger."
+                );
+            }
+            final String brokerSideRetentionBytes =
+                getBrokerSideConfigValue(brokerSideTopicConfig, 
TopicConfig.RETENTION_BYTES_CONFIG, topicName);
+            if (brokerSideRetentionBytes != null) {
+                addMisconfiguration(
+                    validationResult,
+                    topicName,
+                    "Retention byte of existing internal topic " + topicName + 
" is set but it should be unset."
+                );
+            }
+        }
+    }
+
+    private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
+                                            final String configName,
+                                            final String topicName) {
+        final ConfigEntry brokerSideConfigEntry = 
brokerSideTopicConfig.get(configName);
+        if (brokerSideConfigEntry == null) {
+            throw new IllegalStateException("The config " + configName + " for 
topic " +

Review comment:
       should this include the guidance from BUG_ERROR_MESSAGE?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", 
topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> 
futuresForTopic,
+                                     final Map<String, InternalTopicConfig> 
topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> 
validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not 
contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",

Review comment:
       ditto about the log level (also for the below uses of `debug`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", 
topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> 
futuresForTopic,
+                                     final Map<String, InternalTopicConfig> 
topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> 
validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not 
contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not 
available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed 
out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic 
validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic 
%s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> 
topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate 
internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} 
milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult 
validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription 
topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count 
is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(

Review comment:
       use `addMisconfiguration`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", 
topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> 
futuresForTopic,
+                                     final Map<String, InternalTopicConfig> 
topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> 
validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not 
contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not 
available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed 
out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic 
validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic 
%s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> 
topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate 
internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} 
milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult 
validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription 
topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count 
is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + 
requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + 
actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, 
topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, 
topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final 
ValidationResult validationResult,
+                                                              final 
InternalTopicConfig topicConfig,
+                                                              final Config 
brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = 
brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " 
should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final 
ValidationResult validationResult,
+                                                            final 
InternalTopicConfig topicConfig,
+                                                            final Config 
brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = 
getBrokerSideConfigValue(brokerSideTopicConfig, 
TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, 
TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =

Review comment:
       wouldn't we want to check the configs the user provided here rather than 
the defaults?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {

Review comment:
       add a javadoc explaining what this method is doing and what the return 
value is?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to