cadonna commented on a change in pull request #9863:
URL: https://github.com/apache/kafka/pull/9863#discussion_r555319837
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
private boolean waitOnState(final State targetState, final long waitMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
+ boolean interrupted = false;
long elapsedMs = 0L;
- while (state != targetState) {
- if (waitMs > elapsedMs) {
- final long remainingMs = waitMs - elapsedMs;
- try {
- stateLock.wait(remainingMs);
- } catch (final InterruptedException e) {
- // it is ok: just move on to the next iteration
+ try {
+ while (state != targetState) {
+ if (waitMs > elapsedMs) {
+ final long remainingMs = waitMs - elapsedMs;
+ try {
+ stateLock.wait(remainingMs);
+ } catch (final InterruptedException e) {
+ interrupted = true;
+ }
+ } else {
+ log.debug("Cannot transit to {} within {}ms",
targetState, waitMs);
+ return false;
}
- } else {
- log.debug("Cannot transit to {} within {}ms", targetState,
waitMs);
- return false;
+ elapsedMs = time.milliseconds() - begin;
+ }
+ } finally {
+ // Make sure to restore the interruption status before
returning.
Review comment:
We do not really interrupt, we restore the interruption state if the
current thread was interrupted. What would be the alternative to restoring
before we return?
If we do not wait until the condition is fulfilled, the while loop with the
wait degenerates to busy waiting because usually the interruption status is
checked at the beginning of the `wait()` method and we will run into the
`InterruptedException` in each iteration which would defeat the purpose of the
wait(). When the `InterruptedException` is thrown, the interruption status is
reset.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]