lucliu1108 commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3344484298


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -429,8 +437,21 @@ private StateConsumer initialize() {
                 );
             }
 
+            if (inErrorState()) {
+                closeStateConsumer(stateConsumer, false);
+                return null;
+            }
+
             setState(RUNNING);
             return stateConsumer;
+        } catch (final WakeupException e) {
+            closeStateConsumer(stateConsumer, false);
+            if (inErrorState()) {
+                log.info("Global thread initialization interrupted by 
shutdown");
+            } else {
+                startupException = new StreamsException(
+                    "Unexpected wakeup during initialization of 
GlobalStreamThread", e);
+            }

Review Comment:
   Thanks for flagging this!
   
   For the `catch {}` block, it was originally used for covering the other 
blocking consumer calls during bootstrap that aren't inside the local 
poll-catch: `GlobalStateManagerImpl#partitionsFor()`, `endOffsets()` and 
`position()` (also part of the bootstrapping path). If `shutdown() is fired 
during these paths, `WakeupException` will propagate up directly.  
   
   Right now after applying the suggestion of @chickenchickenlove , this catch 
is no longer needed and has been removed. All `WakeupException` during 
bootstrap are now caught inside `GlobalStateManagerImpl.initialize()` and 
converted to Optional.empty(), so no `WakeupExceiption` reaches the 
`GlobalStreamThread.initialize()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to