lucliu1108 commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3344484298
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -429,8 +437,21 @@ private StateConsumer initialize() {
);
}
+ if (inErrorState()) {
+ closeStateConsumer(stateConsumer, false);
+ return null;
+ }
+
setState(RUNNING);
return stateConsumer;
+ } catch (final WakeupException e) {
+ closeStateConsumer(stateConsumer, false);
+ if (inErrorState()) {
+ log.info("Global thread initialization interrupted by
shutdown");
+ } else {
+ startupException = new StreamsException(
+ "Unexpected wakeup during initialization of
GlobalStreamThread", e);
+ }
Review Comment:
Thanks for flagging this!
For the `catch {}` block, it was originally used for covering the other
blocking consumer calls during bootstrap that aren't inside the local
poll-catch: `GlobalStateManagerImpl#partitionsFor()`, `endOffsets()` and
`position()` (also part of the bootstrapping path). If `shutdown() is fired
during these paths, `WakeupException` will propagate up directly.
Right now after applying the suggestion of @chickenchickenlove , this catch
is no longer needed and has been removed. All `WakeupException` during
bootstrap are now caught inside `GlobalStateManagerImpl.initialize()` and
converted to Optional.empty(), so no `WakeupExceiption` reaches the
`GlobalStreamThread.initialize()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]