nicktelford commented on code in PR #21738:
URL: https://github.com/apache/kafka/pull/21738#discussion_r2940438849
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -254,70 +269,49 @@ public StateStore globalStore(final String name) {
}
// package-private for test only
- void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
- try {
- final Map<TopicPartition, Long> loadedCheckpoints =
checkpointFile.read();
-
- log.trace("Loaded offsets from the checkpoint file: {}",
loadedCheckpoints);
-
- for (final StateStoreMetadata store : stores.values()) {
- if (store.corrupted) {
- log.error("Tried to initialize store offsets for corrupted
store {}", store);
- throw new IllegalStateException("Should not initialize
offsets for a corrupted task");
- }
+ void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+ for (final StateStoreMetadata store : stores.values()) {
+ if (store.corrupted) {
+ log.error("Tried to initialize store offsets for corrupted
store {}", store);
+ throw new ProcessorStateException(
+ "Error initializing offsets for store '" + store + "'",
+ new IllegalStateException("Should not initialize
offsets for a corrupted task")
+ );
+ }
- if (store.changelogPartition == null) {
- log.info("State store {} is not logged and hence would not
be restored", store.stateStore.name());
- } else if (!store.stateStore.persistent()) {
- log.info("Initializing to the starting offset for
changelog {} of in-memory state store {}",
- store.changelogPartition,
store.stateStore.name());
- } else if (store.offset() == null) {
- if
(loadedCheckpoints.containsKey(store.changelogPartition)) {
- final Long offset =
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
- store.setOffset(offset);
-
- log.info("State store {} initialized from checkpoint
with offset {} at changelog {}",
- store.stateStore.name(), store.offset,
store.changelogPartition);
- } else {
- // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
- // and hence we are uncertain that the current local
state only contains committed data;
- // in that case we need to treat it as a
task-corrupted exception
- if (eosEnabled && !storeDirIsEmpty) {
- log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
+ if (store.changelogPartition == null) {
+ log.info("State store {} is not logged and hence would not be
restored", store.stateStore.name());
+ } else if (!store.stateStore.persistent()) {
+ log.info("Initializing to the starting offset for changelog {}
of in-memory state store {}",
+ store.changelogPartition, store.stateStore.name());
+ } else if (store.offset() == null) {
+ final Long offset =
store.stateStore.committedOffset(store.changelogPartition);
+
+ if (offset != null) {
+ store.setOffset(offset);
+ log.info("State store {} initialized from checkpoint with
offset {} at changelog {}",
+ store.stateStore.name(), store.offset,
store.changelogPartition);
+ } else {
+ // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
+ // and hence we are uncertain that the current local state
only contains committed data;
+ // in that case we need to treat it as a task-corrupted
exception
+ if (eosEnabled && !storeDirIsEmpty) {
+ log.warn("State store {} did not find checkpoint
offsets while stores are not empty, " +
"since under EOS it has the risk of getting
uncommitted data in stores we have to " +
"treat it as a task corruption error and wipe
out the local state of task {} " +
"before re-bootstrapping",
store.stateStore.name(), taskId);
- throw new
TaskCorruptedException(Collections.singleton(taskId));
- } else {
- log.info("State store {} did not find checkpoint
offset, hence would " +
- "default to the starting offset at changelog
{}",
+ throw new
TaskCorruptedException(Collections.singleton(taskId));
+ } else {
+ log.info("State store {} did not find checkpoint
offset, hence would " +
+ "default to the starting offset at
changelog {}",
store.stateStore.name(),
store.changelogPartition);
- }
}
- } else {
- loadedCheckpoints.remove(store.changelogPartition);
- log.debug("Skipping re-initialization of offset from
checkpoint for recycled store {}",
- store.stateStore.name());
}
}
-
- stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
-
- if (!loadedCheckpoints.isEmpty()) {
- log.warn("Some loaded checkpoint offsets cannot find their
corresponding state stores: {}", loadedCheckpoints);
- }
-
- if (eosEnabled) {
- checkpointFile.delete();
- }
- } catch (final TaskCorruptedException e) {
- throw e;
- } catch (final IOException | RuntimeException e) {
- // both IOException or runtime exception like number parsing can
throw
- throw new ProcessorStateException(format("%sError loading and
deleting checkpoint file when creating the state manager",
- logPrefix), e);
}
+
+ stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
Review Comment:
Ahh yes. Looking at the implementation of `updateTaskOffsets`, it seems the
only exception that would have previously been caught here is a
`StreamsException` (wrapping an `IllegalStateException`), thrown when one of
the offsets is negative.
I actually think the old behaviour might have been a bit of a bug (wrapping
a `StreamsException` in a `ProcessorStateException` seems weird to me). But
this exception should never be thrown anyway, because it would require an
offset to somehow be negative (which I think would indicate either a bug or
data corruption).
Regardless, I'm restoring the `try`/`catch` block, locally around this
statement, but with a slightly re-worded error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]