cadonna commented on a change in pull request #10317: URL: https://github.com/apache/kafka/pull/10317#discussion_r596292688
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -580,4 +562,118 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig, return topicsToCreate; } + + /** + * Sets up internal topics. + * + * Either the given topic are all created or the method fails with an exception. + * + * @param topicConfigs internal topics to setup + */ + public void setup(final Map<String, InternalTopicConfig> topicConfigs) { + log.info("Starting to setup internal topics {}.", topicConfigs.keySet()); + + final long now = time.milliseconds(); + final long deadline = now + retryTimeoutMs; + + final Map<String, Map<String, String>> newTopicConfigs = topicConfigs.values().stream() + .collect(Collectors.toMap( + InternalTopicConfig::name, + topicConfig -> topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention) + )); + final Set<String> topicStillToCreate = new HashSet<>(topicConfigs.keySet()); + while (!topicStillToCreate.isEmpty()) { + final Set<NewTopic> newTopics = topicStillToCreate.stream() + .map(topicName -> new NewTopic( + topicName, + topicConfigs.get(topicName).numberOfPartitions(), + Optional.of(replicationFactor) + ).configs(newTopicConfigs.get(topicName)) + ).collect(Collectors.toSet()); + + log.info("Going to create internal topics: " + newTopics); + final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics); + + final Map<String, KafkaFuture<Void>> createResultForTopic = createTopicsResult.values(); + while (!createResultForTopic.isEmpty()) { + for (final InternalTopicConfig topicConfig : topicConfigs.values()) { + final String topicName = topicConfig.name(); + if (!createResultForTopic.containsKey(topicName)) { + throw new IllegalStateException("Create topic results do not contain internal topic " + topicName + + " to setup. " + BUG_ERROR_MESSAGE); + } + final KafkaFuture<Void> createResult = createResultForTopic.get(topicName); + if (createResult.isDone()) { + try { + createResult.get(); + topicStillToCreate.remove(topicName); + } catch (final ExecutionException executionException) { + final Throwable cause = executionException.getCause(); + if (cause instanceof TopicExistsException) { + log.info("Internal topic {} already exists. Topic is probably marked for deletion. " + + "Will retry to create this topic later (to let broker complete async delete operation first)", + topicName); + } else if (cause instanceof TimeoutException) { + log.info("Creating internal topic {} timed out.", topicName); + } else { + log.error("Unexpected error during creation of internal topic: ", cause); + throw new StreamsException( + String.format("Could not create internal topic %s for the following reason: ", topicName), + cause + ); + } + } catch (final InterruptedException interruptedException) { + throw new InterruptException(interruptedException); + } finally { + createResultForTopic.remove(topicName); + } + } + } + + maybeThrowTimeoutException( + Collections.singletonList(topicStillToCreate), + deadline, + String.format("Could not create internal topics within %d milliseconds. This can happen if the " + Review comment: This sounds like a really useful idea. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org