chia7712 commented on code in PR #19955:
URL: https://github.com/apache/kafka/pull/19955#discussion_r2410568760


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1588,28 +1598,50 @@ public synchronized boolean close(final Duration 
timeout) throws IllegalArgument
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
 
-        return close(Optional.of(timeoutMs), false);
+        return close(Optional.of(timeoutMs), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     }
 
     /**
      * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
      * threads to join.
+     * This method is deprecated and replaced by {@link 
#close(org.apache.kafka.streams.CloseOptions)}.
      * @param options  contains timeout to specify how long to wait for the 
threads to shut down, and a flag leaveGroup to
      *                 trigger consumer leave call
      * @return {@code true} if all threads were successfully 
stopped—{@code false} if the timeout was reached
      * before all threads stopped
      * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
      * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
      */
+    @Deprecated(since = "4.2")
     public synchronized boolean close(final CloseOptions options) throws 
IllegalArgumentException {
+        final org.apache.kafka.streams.CloseOptions closeOptions = 
org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
+                .withGroupMembershipOperation(options.leaveGroup ?
+                        
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
+                        
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        return close(closeOptions);
+    }
+
+    /**
+     * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
+     * threads to join.
+     * @param options  contains timeout to specify how long to wait for the 
threads to shut down,
+     *                 and a {@link 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
+     *                 to trigger consumer leave call or remain in the group
+     * @return {@code true} if all threads were successfully 
stopped—{@code false} if the timeout was reached
+     * before all threads stopped
+     * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
+     * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
+     */
+    public synchronized boolean close(final 
org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
         Objects.requireNonNull(options, "options cannot be null");
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
-        final long timeoutMs = validateMillisecondDuration(options.timeout, 
msgPrefix);
+        final CloseOptionsInternal optionsInternal = new 
CloseOptionsInternal(options);
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");

Review Comment:
   Is it safe to pass an `Optional<Duration>` to 
`prepareMillisCheckFailMsgPrefix`? I assume the accepted type is a `Duration`.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1588,28 +1598,50 @@ public synchronized boolean close(final Duration 
timeout) throws IllegalArgument
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
 
-        return close(Optional.of(timeoutMs), false);
+        return close(Optional.of(timeoutMs), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     }
 
     /**
      * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
      * threads to join.
+     * This method is deprecated and replaced by {@link 
#close(org.apache.kafka.streams.CloseOptions)}.
      * @param options  contains timeout to specify how long to wait for the 
threads to shut down, and a flag leaveGroup to
      *                 trigger consumer leave call
      * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
      * before all threads stopped
      * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
      * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
      */
+    @Deprecated(since = "4.2")
     public synchronized boolean close(final CloseOptions options) throws 
IllegalArgumentException {
+        final org.apache.kafka.streams.CloseOptions closeOptions = 
org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
+                .withGroupMembershipOperation(options.leaveGroup ?
+                        
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
+                        
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        return close(closeOptions);
+    }
+
+    /**
+     * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
+     * threads to join.
+     * @param options  contains timeout to specify how long to wait for the 
threads to shut down,
+     *                 and a {@link 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
+     *                 to trigger consumer leave call or remain in the group
+     * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
+     * before all threads stopped
+     * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
+     * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
+     */
+    public synchronized boolean close(final 
org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
         Objects.requireNonNull(options, "options cannot be null");
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
-        final long timeoutMs = validateMillisecondDuration(options.timeout, 
msgPrefix);
+        final CloseOptionsInternal optionsInternal = new 
CloseOptionsInternal(options);
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
+        final long timeoutMs = 
validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);

Review Comment:
   the `timeout` could be `empty`, right?



##########
streams/src/main/java/org/apache/kafka/streams/CloseOptions.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+public class CloseOptions {
+    /**
+     * Enum to specify the group membership operation upon closing the Kafka 
Streams application.
+     *
+     * <ul>
+     *   <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the 
group.</li>
+     *   <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in 
the group.</li>
+     * </ul>
+     */
+    public enum GroupMembershipOperation {
+        LEAVE_GROUP,
+        REMAIN_IN_GROUP
+    }
+
+    /**
+     * Specifies the group membership operation upon shutdown.
+     * By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be 
applied, which follows the KafkaStreams default behavior.
+     */
+    protected GroupMembershipOperation operation = 
GroupMembershipOperation.REMAIN_IN_GROUP;
+
+    /**
+     * Specifies the maximum amount of time to wait for the close process to 
complete.
+     * This allows users to define a custom timeout for gracefully stopping 
the KafkaStreams.
+     */
+    protected Optional<Duration> timeout = 
Optional.of(Duration.ofMillis(Long.MAX_VALUE));
+
+    private CloseOptions() {
+    }
+
+    protected CloseOptions(final CloseOptions closeOptions) {
+        this.operation = closeOptions.operation;
+        this.timeout = closeOptions.timeout;
+    }
+
+    /**
+     * Static method to create a {@code CloseOptions} with a custom timeout.
+     *
+     * @param timeout the maximum time to wait for the KafkaStreams to close.
+     * @return a new {@code CloseOptions} instance with the specified timeout.
+     */
+    public static CloseOptions timeout(final Duration timeout) {
+        return new CloseOptions().withTimeout(timeout);
+    }
+
+    /**
+     * Static method to create a {@code CloseOptions} with a specified group 
membership operation.
+     *
+     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+     * @return a new {@code CloseOptions} instance with the specified group 
membership operation.
+     */
+    public static CloseOptions groupMembershipOperation(final 
GroupMembershipOperation operation) {
+        return new CloseOptions().withGroupMembershipOperation(operation);
+    }
+
+    /**
+     * Fluent method to set the timeout for the close process.
+     *
+     * @param timeout the maximum time to wait for the KafkaStreams to close. 
If {@code null}, the default timeout will be used.

Review Comment:
   > If {@code null}, the default timeout will be used.
   
   It looks like the implementation is ignoring the comment :cry: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to