[
https://issues.apache.org/jira/browse/KAFKA-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643933#comment-16643933
]
ASF GitHub Bot commented on KAFKA-7477:
---
mjsax closed pull request #5747: KAFKA-7477: Improve Streams close timeout
semantics
URL: https://github.com/apache/kafka/pull/5747
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5fb89598507..d419ff50870 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -218,13 +218,7 @@ private boolean waitOnState(final State targetState, final
long waitMs) {
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
-if (waitMs == 0) {
-try {
-stateLock.wait();
-} catch (final InterruptedException e) {
-// it is ok: just move on to the next iteration
-}
-} else if (waitMs > elapsedMs) {
+if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
@@ -824,17 +818,30 @@ public void close() {
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
- * @param timeout how long to wait for the threads to shutdown
+ * @param timeout how long to wait for the threads to shutdown. Can't be
negative. If {@code timeout=0} just checking the state and return immediately.
* @param timeUnit unit of time used for timeout
* @return {@code true} if all threads were successfully
stopped—{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange}
callback of {@link StateListener}.
- * @deprecated Use {@link #close(Duration)} instead
+ * @deprecated Use {@link #close(Duration)} instead; note, that {@link
#close(Duration)} has different semantics and does not block on zero, e.g.,
`Duration.ofMillis(0)`.
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit
timeUnit) {
-log.debug("Stopping Streams client with timeoutMillis = {} ms.",
timeUnit.toMillis(timeout));
+long timeoutMs = timeUnit.toMillis(timeout);
+
+log.debug("Stopping Streams client with timeoutMillis = {} ms. You are
using deprecated method. " +
+"Please, consider update your code.", timeoutMs);
+
+if (timeoutMs < 0) {
+timeoutMs = 0;
+} else if (timeoutMs == 0) {
+timeoutMs = Long.MAX_VALUE;
+}
+
+return close(timeoutMs);
+}
+private boolean close(final long timeoutMs) {
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been
stopped
@@ -890,7 +897,7 @@ public void run() {
shutdownThread.start();
}
-if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
+if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
log.info("Streams client stopped completely");
return true;
} else {
@@ -912,7 +919,15 @@ public void run() {
*/
public synchronized boolean close(final Duration timeout) throws
IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
-return close(timeout.toMillis(), TimeUnit.MILLISECONDS);
+
+final long timeoutMs = timeout.toMillis();
+if (timeoutMs < 0) {
+throw new IllegalArgumentException("Timeout can't be negative.");
+}
+
+log.debug("Stopping Streams client with timeoutMillis = {} ms.",
timeoutMs);
+
+return close(timeoutMs);
}
/**
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index abc4cb90b7d..b9d542bc9b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws
InterruptedException {
}
}
+@Test
+public void shouldThrowOnNegativeTimeoutForClose() {
+final KafkaStreams s