lucliu1108 commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3344391665


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -493,7 +516,20 @@ private void restoreState(final StateStoreMetadata 
storeMetadata) {
                 // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
                 //      `poll(pollMS)` without adding the request timeout and 
do a more precise
                 //      timeout handling
-                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+                if (inErrorStateSupplier.getAsBoolean()) {
+                    logBootstrapInterrupted(storeMetadata);
+                    return;

Review Comment:
   Hi @chickenchickenlove , thanks for the review!
   Good point, i refactored to make the interrupted path explicit by:
   - `GlobalStateManager.initialize()` now returns `Optional<Set<String>>` 
intead of `Set<String>`. `Optional.empty() `is the explicit "bootstrap was 
interrupted by shutdown" signal — set both when the supplier-check fires 
between polls and when a `WakeupException` is caught during shutdown.
   - GlobalStateUpdateTask.initialize() checks the Optional first; if empty, it 
returns `Collections.emptyMap()` immediately and skips the rest of the process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to