k-apol commented on code in PR #19913:
URL: https://github.com/apache/kafka/pull/19913#discussion_r2142372861


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {
+            final ValidationResult validationResult = 
internalTopicManager.validate(allInternalTopics); // can throw timeout
+
+            final boolean noInternalTopicsExist = allInternalTopics.keySet() 
== validationResult.missingTopics();
+            final boolean internalTopicsMisconfigured = 
!validationResult.misconfigurationsForTopics().isEmpty();
+            final boolean allInternalTopicsExist = 
validationResult.missingTopics().isEmpty();
+            final boolean missingSourceTopics = 
!Collections.disjoint(validationResult.missingTopics(), allSourceTopics);
+
+            if (internalTopicsMisconfigured) {
+                throw new MisconfiguredInternalTopicException("Misconfigured 
Internal Topics: " + validationResult.misconfigurationsForTopics());
+            }
+            if (missingSourceTopics) {
+                allSourceTopics.retainAll(validationResult.missingTopics());
+                throw new MissingSourceTopicException("Missing source topics: 
" + allSourceTopics);
+            }
+            if (noInternalTopicsExist) {
+                internalTopicManager.setup(allInternalTopics);
+            } else if (allInternalTopicsExist) {
+                throw new InternalTopicsAlreadySetupException("All internal 
topics have already been setup");
+            } else {
+                if (initParameters.setupInternalTopicsIfIncompleteEnabled()) {
+                    final Map<String, InternalTopicConfig> topicsToCreate = 
new HashMap<>();
+                    for (final String missingTopic : 
validationResult.missingTopics()) {
+                        topicsToCreate.put(missingTopic, 
allInternalTopics.get(missingTopic));
+                    }
+                    internalTopicManager.makeReady(topicsToCreate);
+                } else {
+                    throw new MissingInternalTopicsException("Missing Internal 
Topics: ", new ArrayList<>(validationResult.missingTopics()));
+                }
+            }
+        } catch (final TimeoutException timeoutException) {
+            throw new TimeoutException(timeoutException.getMessage(), 
timeoutException);
+        } catch (final StreamsException streamsException) {
+            throw new StreamsException(streamsException.getMessage(), 
streamsException);
+        }
+    }
+
+    public static class InitParameters {
+        private boolean timeoutEnabled;
+        private Duration timeout;
+        private final boolean setupInternalTopicsIfIncomplete;
+
+        private InitParameters(final boolean setupInternalTopicsIfIncomplete) {
+            this.setupInternalTopicsIfIncomplete = 
setupInternalTopicsIfIncomplete;
+        }
+
+        // Default: don't create missing topics if only some are missing
+        public static InitParameters initParameters() {
+            return new InitParameters(false);
+        }
+
+        public InitParameters enableSetupInternalTopicsIfIncomplete() {
+            return new InitParameters(true);
+        }
+
+        public InitParameters disableSetupInternalTopicsIfIncomplete() {
+            return new InitParameters(false);
+        }
+
+        public boolean setupInternalTopicsIfIncompleteEnabled() {
+            return setupInternalTopicsIfIncomplete;
+        }
+
+        public final void enableTimeout() {

Review Comment:
   Makes sense, I will update as this and the other timeout-related code as 
conversation evolves. There may be a better way to approach handling a timeout 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to