vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459762279



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {

Review comment:
       I can't comment above, but can the `poll` call itself throw a timeout 
exception? Or does it always just return no results in that case?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately

Review comment:
       I think this makes sense, but it seems to make some assumptions about 
the internal implementation of the consumer that doesn't seem necessary here. 
Is it important to assert that the consumer can't possibly get a timeout 
exception here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
+
+        long deadlineMs = NO_DEADLINE;
+        final List<StateStore> storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+        while (!storesToInitialize.isEmpty()) {
+            // we remove and add back on failure to round-robin through all 
stores
+            final StateStore stateStore = storesToInitialize.remove(0);
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+
+            try {
+                stateStore.init(globalProcessorContext, stateStore);
+                deadlineMs = NO_DEADLINE;
+            } catch (final RetryableErrorException retryableException) {
+                if (taskTimeoutMs == 0L) {
+                    throw new StreamsException(retryableException.getCause());

Review comment:
       Why not just preserve the whole story of what happened, like `throw new 
StreamsException("Couldn't retry because timeout is set to zero", 
retryableException)`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately
+
+                        // hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+                        throw new IllegalStateException(error);
+                    }
+
                     stateRestoreAdapter.restoreBatch(restoreRecords);
                     stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
                     restoreCount += restoreRecords.size();

Review comment:
       Huh, interesting thought. Just to be clear, it looks like it would 
already block forever in this case today, right?
   Yeah, it does seem like we should implement a similar non-progress timeout 
for this loop in that case.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
+
+        long deadlineMs = NO_DEADLINE;
+        final List<StateStore> storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+        while (!storesToInitialize.isEmpty()) {
+            // we remove and add back on failure to round-robin through all 
stores
+            final StateStore stateStore = storesToInitialize.remove(0);
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+
+            try {
+                stateStore.init(globalProcessorContext, stateStore);
+                deadlineMs = NO_DEADLINE;
+            } catch (final RetryableErrorException retryableException) {
+                if (taskTimeoutMs == 0L) {
+                    throw new StreamsException(retryableException.getCause());
+                }
+
+                storesToInitialize.add(stateStore);
+
+                final long currentWallClockMs = time.milliseconds();
+                if (deadlineMs == NO_DEADLINE) {
+                    final long newDeadlineMs = currentWallClockMs + 
taskTimeoutMs;
+                    deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : 
newDeadlineMs;
+                } else if (currentWallClockMs > deadlineMs) {
+                    throw new TimeoutException(String.format(
+                        "Global task did not make progress to restore state 
within %d ms. Adjust `task.timeout.ms` if needed.",
+                        currentWallClockMs - deadlineMs + taskTimeoutMs
+                    ));
+                }
+
+                log.debug(retryableException.getMessage() + " Will retry. 
Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs);

Review comment:
       ```suggestion
                   log.debug(String.format("Got an exception in store.init(). 
Will retry. Remaining time in milliseconds: %d",  deadlineMs - 
currentWallClockMs), retryableException);
   ```
   
   Sorry, my prior feedback was ambiguous. This is what I meant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to