ableegoldman commented on a change in pull request #9863:
URL: https://github.com/apache/kafka/pull/9863#discussion_r556176651
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -249,20 +249,31 @@ public boolean isValidTransition(final State newState) {
private boolean waitOnState(final State targetState, final long waitMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
+ boolean interrupted = false;
long elapsedMs = 0L;
- while (state != targetState) {
- if (waitMs > elapsedMs) {
- final long remainingMs = waitMs - elapsedMs;
- try {
- stateLock.wait(remainingMs);
- } catch (final InterruptedException e) {
- // it is ok: just move on to the next iteration
+ try {
+ while (state != targetState) {
+ if (waitMs > elapsedMs) {
+ final long remainingMs = waitMs - elapsedMs;
+ try {
+ stateLock.wait(remainingMs);
+ } catch (final InterruptedException e) {
+ interrupted = true;
Review comment:
Maybe we should have a quick sync on this. My understanding is that an
interrupt means that the thread wants to regain control somewhere along the
callstack. So the only way I can see to interpret it is as "(1) stop
blocking/waiting on whatever you're doing, (2) get the system back into a
consistent state, and then (3) reset the flag so the interrupt can be handled
(or not) by the caller". Before this PR we were doing (1) and (2), now we're
doing (2) and (3), but why not all three?
If we don't break out of the loop then we've effectively ignored the
interrupt, since we will go on waiting for it to reach NOT_RUNNING. But it's
actually worse, since as you mentioned in another comment, it'll now be in a
busy loop
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]