UladzislauBlok commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3330149215


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -348,7 +357,21 @@ private void reprocessState(final StateStoreMetadata 
storeMetadata) {
                 // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
                 //      `poll(pollMS)` without adding the request timeout and 
do a more precise
                 //      timeout handling
-                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+                if (inErrorStateSupplier.getAsBoolean()) {
+                    logBootstrapInterrupted(storeMetadata);
+                    return;
+                }

Review Comment:
   Looks like a lot of code duplication. Can we move it to dedicated method, or 
keep it on high level?
   AFAIU, when restoration will be completed (on current store) and we'll move 
to next one we'll interrupt it anyway. Kinda trade-off to not check same 
condition n-times
   ```java
   for (final StateStoreMetadata metadata : storeMetadata.values()) {
       if (inErrorStateSupplier.getAsBoolean()) {
           log.info("Global store bootstrap interrupted by shutdown before 
starting {}", metadata.stateStore.name());
           break;
       }
   ...
   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -299,7 +300,13 @@ public void run() {
                 if (size != -1L) {
                     cache.resize(size);
                 }
-                stateConsumer.pollAndUpdate();
+                try {
+                    stateConsumer.pollAndUpdate();
+                } catch (final WakeupException e) {
+                    if (!inErrorState()) {
+                        throw e;
+                    }
+                }

Review Comment:
   I think this is not part of bootstrapping, isn't it?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -429,8 +437,21 @@ private StateConsumer initialize() {
                 );
             }
 
+            if (inErrorState()) {
+                closeStateConsumer(stateConsumer, false);
+                return null;
+            }
+
             setState(RUNNING);
             return stateConsumer;
+        } catch (final WakeupException e) {
+            closeStateConsumer(stateConsumer, false);
+            if (inErrorState()) {
+                log.info("Global thread initialization interrupted by 
shutdown");
+            } else {
+                startupException = new StreamsException(
+                    "Unexpected wakeup during initialization of 
GlobalStreamThread", e);
+            }

Review Comment:
   I think this part should be enough:
   ```java
   if (inErrorState()) {
       closeStateConsumer(stateConsumer, false);
       return null;
   }
   ```
   
   Do we need to catch `WakeupException`?
   UPD: Overall idea is to `break` execution of `GlobalStateManagerImpl` and 
verify if it was interrupted (check `inErrorState()`) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to