mjsax commented on code in PR #19913:
URL: https://github.com/apache/kafka/pull/19913#discussion_r2141248052


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.

Review Comment:
   This description is a little bit brief. Would be good to add more context 
that "broker-side state" actually is, ect.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -173,6 +173,7 @@ public class StreamsConfig extends AbstractConfig {
 
     public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
 
+    public static final Duration DEFAULT_INIT_TIMEOUT_MS = 
Duration.ofMillis(30000);

Review Comment:
   I don't think we should add this here, as it would "leak" into public API. 
Not even sure if we need a variable for it at all?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {

Review Comment:
   Might also throw `TimeoutException`, right, as it applied the default 
timeout?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);

Review Comment:
   nit: avoid unnecessary `this.` prefix



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {

Review Comment:
   nit: add empty line to separate blocks



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {
+            final ValidationResult validationResult = 
internalTopicManager.validate(allInternalTopics); // can throw timeout
+
+            final boolean noInternalTopicsExist = allInternalTopics.keySet() 
== validationResult.missingTopics();
+            final boolean internalTopicsMisconfigured = 
!validationResult.misconfigurationsForTopics().isEmpty();
+            final boolean allInternalTopicsExist = 
validationResult.missingTopics().isEmpty();
+            final boolean missingSourceTopics = 
!Collections.disjoint(validationResult.missingTopics(), allSourceTopics);
+
+            if (internalTopicsMisconfigured) {
+                throw new MisconfiguredInternalTopicException("Misconfigured 
Internal Topics: " + validationResult.misconfigurationsForTopics());
+            }
+            if (missingSourceTopics) {
+                allSourceTopics.retainAll(validationResult.missingTopics());
+                throw new MissingSourceTopicException("Missing source topics: 
" + allSourceTopics);
+            }
+            if (noInternalTopicsExist) {
+                internalTopicManager.setup(allInternalTopics);
+            } else if (allInternalTopicsExist) {
+                throw new InternalTopicsAlreadySetupException("All internal 
topics have already been setup");
+            } else {
+                if (initParameters.setupInternalTopicsIfIncompleteEnabled()) {
+                    final Map<String, InternalTopicConfig> topicsToCreate = 
new HashMap<>();
+                    for (final String missingTopic : 
validationResult.missingTopics()) {
+                        topicsToCreate.put(missingTopic, 
allInternalTopics.get(missingTopic));
+                    }
+                    internalTopicManager.makeReady(topicsToCreate);
+                } else {
+                    throw new MissingInternalTopicsException("Missing Internal 
Topics: ", new ArrayList<>(validationResult.missingTopics()));
+                }
+            }
+        } catch (final TimeoutException timeoutException) {
+            throw new TimeoutException(timeoutException.getMessage(), 
timeoutException);
+        } catch (final StreamsException streamsException) {
+            throw new StreamsException(streamsException.getMessage(), 
streamsException);
+        }
+    }
+
+    public static class InitParameters {
+        private boolean timeoutEnabled;
+        private Duration timeout;
+        private final boolean setupInternalTopicsIfIncomplete;
+
+        private InitParameters(final boolean setupInternalTopicsIfIncomplete) {
+            this.setupInternalTopicsIfIncomplete = 
setupInternalTopicsIfIncomplete;
+        }
+
+        // Default: don't create missing topics if only some are missing
+        public static InitParameters initParameters() {
+            return new InitParameters(false);
+        }
+
+        public InitParameters enableSetupInternalTopicsIfIncomplete() {
+            return new InitParameters(true);
+        }
+
+        public InitParameters disableSetupInternalTopicsIfIncomplete() {
+            return new InitParameters(false);
+        }
+
+        public boolean setupInternalTopicsIfIncompleteEnabled() {
+            return setupInternalTopicsIfIncomplete;
+        }
+
+        public final void enableTimeout() {

Review Comment:
   This method was not defined on the KIP -- so we should not add it -- or we 
need to update the KIP.
   
   Looking into KIP-1092 and KIP-1153 which introduce `CloseOptions` which also 
takes a timeout, it might make sense to update the KIP to include the timeout 
in `InitParameters` and remove the `Duration timeout` parameter and 
corresponding `init()` overloads? @cadonna WDYT?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java:
##########
@@ -135,4 +135,4 @@ public Set<TaskId> statefulTaskIds() {
     public Map<TaskId, Set<TopicPartition>> changelogPartionsForTask() {
         return Collections.unmodifiableMap(changelogPartitionsForStatefulTask);
     }
-}
\ No newline at end of file
+}

Review Comment:
   nit: avoid unnecessary format changes (seems to be a difference in EOF 
encoding)



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {
+            final ValidationResult validationResult = 
internalTopicManager.validate(allInternalTopics); // can throw timeout
+
+            final boolean noInternalTopicsExist = allInternalTopics.keySet() 
== validationResult.missingTopics();
+            final boolean internalTopicsMisconfigured = 
!validationResult.misconfigurationsForTopics().isEmpty();
+            final boolean allInternalTopicsExist = 
validationResult.missingTopics().isEmpty();
+            final boolean missingSourceTopics = 
!Collections.disjoint(validationResult.missingTopics(), allSourceTopics);
+
+            if (internalTopicsMisconfigured) {
+                throw new MisconfiguredInternalTopicException("Misconfigured 
Internal Topics: " + validationResult.misconfigurationsForTopics());
+            }
+            if (missingSourceTopics) {
+                allSourceTopics.retainAll(validationResult.missingTopics());
+                throw new MissingSourceTopicException("Missing source topics: 
" + allSourceTopics);
+            }
+            if (noInternalTopicsExist) {
+                internalTopicManager.setup(allInternalTopics);
+            } else if (allInternalTopicsExist) {
+                throw new InternalTopicsAlreadySetupException("All internal 
topics have already been setup");
+            } else {
+                if (initParameters.setupInternalTopicsIfIncompleteEnabled()) {
+                    final Map<String, InternalTopicConfig> topicsToCreate = 
new HashMap<>();
+                    for (final String missingTopic : 
validationResult.missingTopics()) {
+                        topicsToCreate.put(missingTopic, 
allInternalTopics.get(missingTopic));
+                    }
+                    internalTopicManager.makeReady(topicsToCreate);
+                } else {
+                    throw new MissingInternalTopicsException("Missing Internal 
Topics: ", new ArrayList<>(validationResult.missingTopics()));
+                }
+            }
+        } catch (final TimeoutException timeoutException) {
+            throw new TimeoutException(timeoutException.getMessage(), 
timeoutException);
+        } catch (final StreamsException streamsException) {
+            throw new StreamsException(streamsException.getMessage(), 
streamsException);
+        }

Review Comment:
   Not sure why we want to catch and rethrow? Seems we don't add any value 
doing this, but we can remove the complete try-catch block and let any 
exception just bubble up unmodified?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {
+            final ValidationResult validationResult = 
internalTopicManager.validate(allInternalTopics); // can throw timeout
+
+            final boolean noInternalTopicsExist = allInternalTopics.keySet() 
== validationResult.missingTopics();

Review Comment:
   ```suggestion
               final boolean noInternalTopicsExist = 
allInternalTopics.keySet().equals(validationResult.missingTopics());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -481,7 +481,12 @@ public void shouldOverrideStreamsDefaultProducerConfigs() {
         assertEquals("10000", 
producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
         assertEquals("30000", 
producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
     }
-

Review Comment:
   nit: keep blank line between methods (also below: add blank line after this 
method)



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {
+            final ValidationResult validationResult = 
internalTopicManager.validate(allInternalTopics); // can throw timeout
+
+            final boolean noInternalTopicsExist = allInternalTopics.keySet() 
== validationResult.missingTopics();

Review Comment:
   Using `==` we don't compare the content of both sets, but if both sets are 
literally the same object.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -555,21 +560,12 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
             }
 
             if (!topicsNotReady.isEmpty()) {

Review Comment:
   General comment: I am missing an update to `InternalTopicManager` to avoid 
creating new topics for the case that the feature is enabled, and topic are 
missing. For this case, we should not attempt to create any topic, but fail 
with an error right away.



##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -1839,6 +1845,145 @@ public Uuid get(final long timeout, final TimeUnit 
timeUnit) {
         assertThat(didAssertThreadTwo.get(), equalTo(true));
         assertThat(didAssertGlobalThread.get(), equalTo(true));
     }
+    @Test

Review Comment:
   nit: add missing blank line



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.init(DEFAULT_INIT_TIMEOUT_MS);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+
+    public void init(final Duration timeout) {
+        final InitParameters initParameters = InitParameters.initParameters();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(InitParameters.initParameters());
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+
+    public void init(final InitParameters initParameters) {
+        this.doInit(initParameters);
+    }
+
+    /**
+     * Initializes broker-side state.
+     *
+     * This methods takes parameters that specify which internal topics to 
setup if some
+     * but not all of them are absent.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the given initialization 
parameters do not specify to setup them
+     * @throws MisconfiguredInternalTopicException if an internal topics is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the given timeout
+     */
+    public void init(final InitParameters initParameters, final Duration 
timeout) {
+        initParameters.enableTimeout();
+        initParameters.setTimeout(timeout);
+
+        this.doInit(initParameters);
+    }
+
+    private void doInit(final InitParameters initParameters) {
+        final InternalTopicManager internalTopicManager = new 
InternalTopicManager(time, adminClient, applicationConfigs);
+        if (initParameters.hasTimeoutEnabled()) {
+            internalTopicManager.setInitTimeout(initParameters.getTimeout());
+        }
+
+        final Map<String, InternalTopicConfig> allInternalTopics = new 
HashMap<>();
+        final Set<String> allSourceTopics = new HashSet<>();
+        for (final Map<TopologyMetadata.Subtopology, 
InternalTopologyBuilder.TopicsInfo> subtopologyMap : 
topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
subtopologyMap.values()) {
+                allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
+                allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
+                allSourceTopics.addAll(topicsInfo.sourceTopics);
+            }
+        }
+        try {
+            final ValidationResult validationResult = 
internalTopicManager.validate(allInternalTopics); // can throw timeout
+
+            final boolean noInternalTopicsExist = allInternalTopics.keySet() 
== validationResult.missingTopics();
+            final boolean internalTopicsMisconfigured = 
!validationResult.misconfigurationsForTopics().isEmpty();
+            final boolean allInternalTopicsExist = 
validationResult.missingTopics().isEmpty();
+            final boolean missingSourceTopics = 
!Collections.disjoint(validationResult.missingTopics(), allSourceTopics);
+
+            if (internalTopicsMisconfigured) {
+                throw new MisconfiguredInternalTopicException("Misconfigured 
Internal Topics: " + validationResult.misconfigurationsForTopics());
+            }
+            if (missingSourceTopics) {
+                allSourceTopics.retainAll(validationResult.missingTopics());
+                throw new MissingSourceTopicException("Missing source topics: 
" + allSourceTopics);
+            }
+            if (noInternalTopicsExist) {
+                internalTopicManager.setup(allInternalTopics);
+            } else if (allInternalTopicsExist) {
+                throw new InternalTopicsAlreadySetupException("All internal 
topics have already been setup");
+            } else {
+                if (initParameters.setupInternalTopicsIfIncompleteEnabled()) {
+                    final Map<String, InternalTopicConfig> topicsToCreate = 
new HashMap<>();
+                    for (final String missingTopic : 
validationResult.missingTopics()) {
+                        topicsToCreate.put(missingTopic, 
allInternalTopics.get(missingTopic));
+                    }
+                    internalTopicManager.makeReady(topicsToCreate);
+                } else {
+                    throw new MissingInternalTopicsException("Missing Internal 
Topics: ", new ArrayList<>(validationResult.missingTopics()));
+                }
+            }
+        } catch (final TimeoutException timeoutException) {
+            throw new TimeoutException(timeoutException.getMessage(), 
timeoutException);
+        } catch (final StreamsException streamsException) {
+            throw new StreamsException(streamsException.getMessage(), 
streamsException);
+        }
+    }
+
+    public static class InitParameters {
+        private boolean timeoutEnabled;
+        private Duration timeout;
+        private final boolean setupInternalTopicsIfIncomplete;
+
+        private InitParameters(final boolean setupInternalTopicsIfIncomplete) {
+            this.setupInternalTopicsIfIncomplete = 
setupInternalTopicsIfIncomplete;
+        }
+
+        // Default: don't create missing topics if only some are missing
+        public static InitParameters initParameters() {
+            return new InitParameters(false);
+        }
+
+        public InitParameters enableSetupInternalTopicsIfIncomplete() {
+            return new InitParameters(true);
+        }
+
+        public InitParameters disableSetupInternalTopicsIfIncomplete() {
+            return new InitParameters(false);
+        }
+
+        public boolean setupInternalTopicsIfIncompleteEnabled() {

Review Comment:
   While the KIP does specify this getter, I am wondering why? We usually don't 
use getters on the main config-classes, but use sub-classes. Might be good to 
re-read the KIP discussion (and if need be, update the KIP to "fix" this)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -628,6 +624,14 @@ protected Map<String, List<TopicPartitionInfo>> 
getTopicPartitionInfo(final Set<
         return topicPartitionInfo;
     }
 
+    public void setInitTimeout(final Duration timeoutMs) {
+        this.isInitializing = true;
+        this.initTimeout = timeoutMs;
+    }
+    private boolean isInitializing() {

Review Comment:
   Seems this method is only called once, and it's rather trivial. We should 
remove it, as it does not add any value IMHO



##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -425,7 +431,7 @@ public void shouldCloseStartupTasksAfterFirstRebalance() 
throws Exception {
         prepareThreadState(streamThreadOne, state1);
         prepareThreadState(streamThreadTwo, state2);
         try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
-            (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
+                (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {

Review Comment:
   avoid unnecessary reformatting (might be an IDE setting that you need to 
adjust, or maybe even better, disable auto-reformatting entirely); same on many 
places below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -555,21 +560,12 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
             }
 
             if (!topicsNotReady.isEmpty()) {
-                currentWallClockMs = time.milliseconds();
-
-                if (currentWallClockMs >= deadlineMs) {
-                    final String timeoutError = String.format("Could not 
create topics within %d milliseconds. " +
-                        "This can happen if the Kafka cluster is temporarily 
not available.", retryTimeoutMs);
-                    log.error(timeoutError);
-                    throw new TimeoutException(timeoutError);
-                }
-                log.info(
-                    "Topics {} could not be made ready. Will retry in {} 
milliseconds. Remaining time in milliseconds: {}",
+                maybeThrowTimeoutExceptionDuringMakeReady(

Review Comment:
   Why did you extract the code into this new helper? Seems it's only called 
once, so unclear to me what the benefit is compared to just inlined code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to