lucliu1108 commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3350628260


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -173,66 +178,78 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext<?, ?> globa
     }
 
     @Override
-    public Set<String> initialize() {
+    public Optional<Set<String>> initialize() {
         droppedRecordsSensor = droppedRecordsSensor(
             Thread.currentThread().getName(),
             globalProcessorContext.taskId().toString(),
             globalProcessorContext.metrics()
         );
 
-        final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
-        for (final StateStore stateStore : topology.globalStateStores()) {
-            final List<TopicPartition> storePartitions = 
topicPartitionsForStore(stateStore);
-            final StateStore maybeWrappedStore = 
LegacyCheckpointingStateStore.maybeWrapStore(
-                    stateStore, eosEnabled, new HashSet<>(storePartitions), 
stateDirectory, null, logPrefix);
-            try {
-                maybeWrappedStore.init(globalProcessorContext, 
maybeWrappedStore);
-            } catch (final ProcessorStateException e) {
-                if (eosEnabled) {
-                    log.warn("{}Detected unclean shutdown for global store {}. 
" +
-                            "Wiping global state directory.", logPrefix, 
stateStore.name(), e);
-                    try {
-                        
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
-                    } catch (final IOException ioe) {
-                        e.addSuppressed(ioe);
+        try {
+            final Map<TopicPartition, StateStore> wrappedStores = new 
HashMap<>();
+            for (final StateStore stateStore : topology.globalStateStores()) {
+                final List<TopicPartition> storePartitions = 
topicPartitionsForStore(stateStore);
+                final StateStore maybeWrappedStore = 
LegacyCheckpointingStateStore.maybeWrapStore(
+                        stateStore, eosEnabled, new 
HashSet<>(storePartitions), stateDirectory, null, logPrefix);
+                try {
+                    maybeWrappedStore.init(globalProcessorContext, 
maybeWrappedStore);
+                } catch (final ProcessorStateException e) {
+                    if (eosEnabled) {
+                        log.warn("{}Detected unclean shutdown for global store 
{}. " +
+                                "Wiping global state directory.", logPrefix, 
stateStore.name(), e);
+                        try {
+                            
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
+                        } catch (final IOException ioe) {
+                            e.addSuppressed(ioe);
+                        }
                     }
+                    throw e;
                 }
-                throw e;
-            }
 
-            for (final TopicPartition storePartition : storePartitions) {
-                wrappedStores.put(storePartition, maybeWrappedStore);
+                for (final TopicPartition storePartition : storePartitions) {
+                    wrappedStores.put(storePartition, maybeWrappedStore);
+                }
             }
-        }
 
-        // migrate offsets from legacy checkpoint file into the stores
-        LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, 
stateDirectory, null, wrappedStores);
-
-        for (final StateStoreMetadata metadata : storeMetadata.values()) {
-            // load the committed offsets from the store
-            final StateStore store = metadata.stateStore;
-            if (store.persistent()) {
-                for (final TopicPartition partition : 
metadata.changelogPartitions) {
-                    final Long offset = store.committedOffset(partition);
-                    if (offset != null) {
-                        currentOffsets.put(partition, offset);
+            // migrate offsets from legacy checkpoint file into the stores
+            LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, 
stateDirectory, null, wrappedStores);
+
+            for (final StateStoreMetadata metadata : storeMetadata.values()) {
+                if (shouldStopBootstrappingSupplier.getAsBoolean()) {

Review Comment:
   Yes. First path is supplier check between stores, the other one is 
wakeupException thrown by methods like `poll`/`partitionsFor`/...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to