vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459762279
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } - offset = globalConsumer.position(topicPartition); + try { Review comment: I can't comment above, but can the `poll` call itself throw a timeout exception? Or does it always just return no results in that case? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } - offset = globalConsumer.position(topicPartition); + try { + offset = globalConsumer.position(topicPartition); + } catch (final TimeoutException error) { + // the `globalConsumer.position()` call should never block, because we know that we did + // a successful `position()` call above for the requested partition and thus the consumer + // should have a valid local position that it can return immediately Review comment: I think this makes sense, but it seems to make some assumptions about the internal implementation of the consumer that doesn't seem necessary here. Is it important to assert that the consumer can't possibly get a timeout exception here? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set<String> changelogTopics = new HashSet<>(); - for (final StateStore stateStore : globalStateStores) { + + long deadlineMs = NO_DEADLINE; + final List<StateStore> storesToInitialize = new LinkedList<>(globalStateStores); + + while (!storesToInitialize.isEmpty()) { + // we remove and add back on failure to round-robin through all stores + final StateStore stateStore = storesToInitialize.remove(0); globalStoreNames.add(stateStore.name()); final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); - stateStore.init(globalProcessorContext, stateStore); + + try { + stateStore.init(globalProcessorContext, stateStore); + deadlineMs = NO_DEADLINE; + } catch (final RetryableErrorException retryableException) { + if (taskTimeoutMs == 0L) { + throw new StreamsException(retryableException.getCause()); Review comment: Why not just preserve the whole story of what happened, like `throw new StreamsException("Couldn't retry because timeout is set to zero", retryableException)`? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } - offset = globalConsumer.position(topicPartition); + try { + offset = globalConsumer.position(topicPartition); + } catch (final TimeoutException error) { + // the `globalConsumer.position()` call should never block, because we know that we did + // a successful `position()` call above for the requested partition and thus the consumer + // should have a valid local position that it can return immediately + + // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` + throw new IllegalStateException(error); + } + stateRestoreAdapter.restoreBatch(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); Review comment: Huh, interesting thought. Just to be clear, it looks like it would already block forever in this case today, right? Yeah, it does seem like we should implement a similar non-progress timeout for this loop in that case. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set<String> changelogTopics = new HashSet<>(); - for (final StateStore stateStore : globalStateStores) { + + long deadlineMs = NO_DEADLINE; + final List<StateStore> storesToInitialize = new LinkedList<>(globalStateStores); + + while (!storesToInitialize.isEmpty()) { + // we remove and add back on failure to round-robin through all stores + final StateStore stateStore = storesToInitialize.remove(0); globalStoreNames.add(stateStore.name()); final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); - stateStore.init(globalProcessorContext, stateStore); + + try { + stateStore.init(globalProcessorContext, stateStore); + deadlineMs = NO_DEADLINE; + } catch (final RetryableErrorException retryableException) { + if (taskTimeoutMs == 0L) { + throw new StreamsException(retryableException.getCause()); + } + + storesToInitialize.add(stateStore); + + final long currentWallClockMs = time.milliseconds(); + if (deadlineMs == NO_DEADLINE) { + final long newDeadlineMs = currentWallClockMs + taskTimeoutMs; + deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : newDeadlineMs; + } else if (currentWallClockMs > deadlineMs) { + throw new TimeoutException(String.format( + "Global task did not make progress to restore state within %d ms. Adjust `task.timeout.ms` if needed.", + currentWallClockMs - deadlineMs + taskTimeoutMs + )); + } + + log.debug(retryableException.getMessage() + " Will retry. Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs); Review comment: ```suggestion log.debug(String.format("Got an exception in store.init(). Will retry. Remaining time in milliseconds: %d", deadlineMs - currentWallClockMs), retryableException); ``` Sorry, my prior feedback was ambiguous. This is what I meant. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org