zheguang commented on code in PR #21892:
URL: https://github.com/apache/kafka/pull/21892#discussion_r3359413799


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -297,6 +305,132 @@ public boolean isValidTransition(final State newState) {
     private final Object stateLock = new Object();
     protected volatile State state = State.CREATED;
 
+    /**
+     * Initializes broker-side state for this Kafka Streams application.
+     * <p>
+     * Kafka Streams creates internal topics on the broker for fault-tolerance 
and repartitioning.
+     * This method validates and optionally creates those internal topics 
before starting the application.
+     *
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     * @throws MisconfiguredInternalTopicException if an internal topic is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     */
+    public void init() {
+        this.doInit(new InitParameters());
+    }
+
+    /**
+     * Initializes broker-side state for this Kafka Streams application.
+     * <p>
+     * Kafka Streams creates internal topics on the broker for fault-tolerance 
and repartitioning.
+     * This method validates and optionally creates those internal topics 
before starting the application.
+     *
+     * @param initParameters parameters controlling initialization behavior
+     * @throws MissingSourceTopicException         if a source topic is missing
+     * @throws MissingInternalTopicsException      if some but not all of the 
internal topics are missing
+     *                                            and the parameters do not 
specify to set them up
+     * @throws MisconfiguredInternalTopicException if an internal topic is 
misconfigured
+     * @throws InternalTopicsAlreadySetupException if all internal topics are 
already setup
+     * @throws TimeoutException                    if initialization exceeds 
the configured timeout
+     */
+    public void init(final InitParameters initParameters) {

Review Comment:
   Looks like here is missing implementation for two other public interfaces 
proposed in the 
[KIP](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=165226988#KIP698:AddExplicitUserInitializationofBrokersideStatetoKafkaStreams-PublicInterfaces):
 
   ```java
   public void init(final Duration timeout);
   public void init(final InitParameters initParameters, final Duration 
timeout);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to