cadonna commented on a change in pull request #9863:
URL: https://github.com/apache/kafka/pull/9863#discussion_r555595987



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
     private boolean waitOnState(final State targetState, final long waitMs) {
         final long begin = time.milliseconds();
         synchronized (stateLock) {
+            boolean interrupted = false;
             long elapsedMs = 0L;
-            while (state != targetState) {
-                if (waitMs > elapsedMs) {
-                    final long remainingMs = waitMs - elapsedMs;
-                    try {
-                        stateLock.wait(remainingMs);
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
+            try {
+                while (state != targetState) {
+                    if (waitMs > elapsedMs) {
+                        final long remainingMs = waitMs - elapsedMs;
+                        try {
+                            stateLock.wait(remainingMs);
+                        } catch (final InterruptedException e) {
+                            interrupted = true;
+                        }
+                    } else {
+                        log.debug("Cannot transit to {} within {}ms", 
targetState, waitMs);
+                        return false;
                     }
-                } else {
-                    log.debug("Cannot transit to {} within {}ms", targetState, 
waitMs);
-                    return false;
+                    elapsedMs = time.milliseconds() - begin;
+                }
+            } finally {
+                // Make sure to restore the interruption status before 
returning.
+                // We do not always own the current thread that executes this 
method, i.e., we do not know the
+                // interruption policy of the thread. The least we can do is 
restore the interruption status before
+                // the current thread exits this method.
+                if (interrupted) {
+                    Thread.currentThread().interrupt();

Review comment:
       In the words of "Java Concurrency in Practice":
   "Thread interruption is a cooperative mechanism for a thread to signal 
another thread that it should, at its convenience and if it feels like it, stop 
what it is doing and do something else." 
   Interruption consists of the interruption status that can be set by the 
current thread itself or by another thread with `Thread#interrupt()`. The 
interruption status can be checked with `Thread#isInterrupted()` and it is 
checked by some blocking methods, most notably `Object#wait()` and 
`Thread#sleep()`, which -- if the current thread is interrupted -- clear the 
interruption status and throw an `InterruptedException`. 
   The "something else" means the application of an interruption policy. Since 
we do not always own the thread that calls `close()`, we do not know the 
interruption policy. So, the minimum that we can do is restoring the 
interrupted status if the current thread was interrupted so that further up the 
call stack of the current thread, the possibly existing interruption policy of 
the current thread can be applied.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to