vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459762279
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback
stateRestoreCallback,
restoreRecords.add(recordConverter.convert(record));
}
}
- offset = globalConsumer.position(topicPartition);
+ try {
Review comment:
I can't comment above, but can the `poll` call itself throw a timeout
exception? Or does it always just return no results in that case?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback
stateRestoreCallback,
restoreRecords.add(recordConverter.convert(record));
}
}
- offset = globalConsumer.position(topicPartition);
+ try {
+ offset = globalConsumer.position(topicPartition);
+ } catch (final TimeoutException error) {
+ // the `globalConsumer.position()` call should never
block, because we know that we did
+ // a successful `position()` call above for the
requested partition and thus the consumer
+ // should have a valid local position that it can
return immediately
Review comment:
I think this makes sense, but it seems to make some assumptions about
the internal implementation of the consumer that doesn't seem necessary here.
Is it important to assert that the consumer can't possibly get a timeout
exception here?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final
InternalProcessorContext globalProce
}
final Set<String> changelogTopics = new HashSet<>();
- for (final StateStore stateStore : globalStateStores) {
+
+ long deadlineMs = NO_DEADLINE;
+ final List<StateStore> storesToInitialize = new
LinkedList<>(globalStateStores);
+
+ while (!storesToInitialize.isEmpty()) {
+ // we remove and add back on failure to round-robin through all
stores
+ final StateStore stateStore = storesToInitialize.remove(0);
globalStoreNames.add(stateStore.name());
final String sourceTopic =
storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
- stateStore.init(globalProcessorContext, stateStore);
+
+ try {
+ stateStore.init(globalProcessorContext, stateStore);
+ deadlineMs = NO_DEADLINE;
+ } catch (final RetryableErrorException retryableException) {
+ if (taskTimeoutMs == 0L) {
+ throw new StreamsException(retryableException.getCause());
Review comment:
Why not just preserve the whole story of what happened, like `throw new
StreamsException("Couldn't retry because timeout is set to zero",
retryableException)`?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback
stateRestoreCallback,
restoreRecords.add(recordConverter.convert(record));
}
}
- offset = globalConsumer.position(topicPartition);
+ try {
+ offset = globalConsumer.position(topicPartition);
+ } catch (final TimeoutException error) {
+ // the `globalConsumer.position()` call should never
block, because we know that we did
+ // a successful `position()` call above for the
requested partition and thus the consumer
+ // should have a valid local position that it can
return immediately
+
+ // hence, a `TimeoutException` indicates a bug and
thus we rethrow it as fatal `IllegalStateException`
+ throw new IllegalStateException(error);
+ }
+
stateRestoreAdapter.restoreBatch(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition,
storeName, offset, restoreRecords.size());
restoreCount += restoreRecords.size();
Review comment:
Huh, interesting thought. Just to be clear, it looks like it would
already block forever in this case today, right?
Yeah, it does seem like we should implement a similar non-progress timeout
for this loop in that case.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final
InternalProcessorContext globalProce
}
final Set<String> changelogTopics = new HashSet<>();
- for (final StateStore stateStore : globalStateStores) {
+
+ long deadlineMs = NO_DEADLINE;
+ final List<StateStore> storesToInitialize = new
LinkedList<>(globalStateStores);
+
+ while (!storesToInitialize.isEmpty()) {
+ // we remove and add back on failure to round-robin through all
stores
+ final StateStore stateStore = storesToInitialize.remove(0);
globalStoreNames.add(stateStore.name());
final String sourceTopic =
storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
- stateStore.init(globalProcessorContext, stateStore);
+
+ try {
+ stateStore.init(globalProcessorContext, stateStore);
+ deadlineMs = NO_DEADLINE;
+ } catch (final RetryableErrorException retryableException) {
+ if (taskTimeoutMs == 0L) {
+ throw new StreamsException(retryableException.getCause());
+ }
+
+ storesToInitialize.add(stateStore);
+
+ final long currentWallClockMs = time.milliseconds();
+ if (deadlineMs == NO_DEADLINE) {
+ final long newDeadlineMs = currentWallClockMs +
taskTimeoutMs;
+ deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE :
newDeadlineMs;
+ } else if (currentWallClockMs > deadlineMs) {
+ throw new TimeoutException(String.format(
+ "Global task did not make progress to restore state
within %d ms. Adjust `task.timeout.ms` if needed.",
+ currentWallClockMs - deadlineMs + taskTimeoutMs
+ ));
+ }
+
+ log.debug(retryableException.getMessage() + " Will retry.
Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs);
Review comment:
```suggestion
log.debug(String.format("Got an exception in store.init().
Will retry. Remaining time in milliseconds: %d", deadlineMs -
currentWallClockMs), retryableException);
```
Sorry, my prior feedback was ambiguous. This is what I meant.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]