nicktelford commented on code in PR #21738:
URL: https://github.com/apache/kafka/pull/21738#discussion_r2940438849


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -254,70 +269,49 @@ public StateStore globalStore(final String name) {
     }
 
     // package-private for test only
-    void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
-        try {
-            final Map<TopicPartition, Long> loadedCheckpoints = 
checkpointFile.read();
-
-            log.trace("Loaded offsets from the checkpoint file: {}", 
loadedCheckpoints);
-
-            for (final StateStoreMetadata store : stores.values()) {
-                if (store.corrupted) {
-                    log.error("Tried to initialize store offsets for corrupted 
store {}", store);
-                    throw new IllegalStateException("Should not initialize 
offsets for a corrupted task");
-                }
+    void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+        for (final StateStoreMetadata store : stores.values()) {
+            if (store.corrupted) {
+                log.error("Tried to initialize store offsets for corrupted 
store {}", store);
+                throw new ProcessorStateException(
+                        "Error initializing offsets for store '" + store + "'",
+                        new IllegalStateException("Should not initialize 
offsets for a corrupted task")
+                );
+            }
 
-                if (store.changelogPartition == null) {
-                    log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
-                } else if (!store.stateStore.persistent()) {
-                    log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
-                             store.changelogPartition, 
store.stateStore.name());
-                } else if (store.offset() == null) {
-                    if 
(loadedCheckpoints.containsKey(store.changelogPartition)) {
-                        final Long offset = 
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
-                        store.setOffset(offset);
-
-                        log.info("State store {} initialized from checkpoint 
with offset {} at changelog {}",
-                                  store.stateStore.name(), store.offset, 
store.changelogPartition);
-                    } else {
-                        // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
-                        // and hence we are uncertain that the current local 
state only contains committed data;
-                        // in that case we need to treat it as a 
task-corrupted exception
-                        if (eosEnabled && !storeDirIsEmpty) {
-                            log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
+            if (store.changelogPartition == null) {
+                log.info("State store {} is not logged and hence would not be 
restored", store.stateStore.name());
+            } else if (!store.stateStore.persistent()) {
+                log.info("Initializing to the starting offset for changelog {} 
of in-memory state store {}",
+                        store.changelogPartition, store.stateStore.name());
+            } else if (store.offset() == null) {
+                final Long offset = 
store.stateStore.committedOffset(store.changelogPartition);
+
+                if (offset != null) {
+                    store.setOffset(offset);
+                    log.info("State store {} initialized from checkpoint with 
offset {} at changelog {}",
+                            store.stateStore.name(), store.offset, 
store.changelogPartition);
+                } else {
+                    // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+                    // and hence we are uncertain that the current local state 
only contains committed data;
+                    // in that case we need to treat it as a task-corrupted 
exception
+                    if (eosEnabled && !storeDirIsEmpty) {
+                        log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
                                 "since under EOS it has the risk of getting 
uncommitted data in stores we have to " +
                                 "treat it as a task corruption error and wipe 
out the local state of task {} " +
                                 "before re-bootstrapping", 
store.stateStore.name(), taskId);
 
-                            throw new 
TaskCorruptedException(Collections.singleton(taskId));
-                        } else {
-                            log.info("State store {} did not find checkpoint 
offset, hence would " +
-                                "default to the starting offset at changelog 
{}",
+                        throw new 
TaskCorruptedException(Collections.singleton(taskId));
+                    } else {
+                        log.info("State store {} did not find checkpoint 
offset, hence would " +
+                                        "default to the starting offset at 
changelog {}",
                                 store.stateStore.name(), 
store.changelogPartition);
-                        }
                     }
-                }  else {
-                    loadedCheckpoints.remove(store.changelogPartition);
-                    log.debug("Skipping re-initialization of offset from 
checkpoint for recycled store {}",
-                              store.stateStore.name());
                 }
             }
-
-            stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
-
-            if (!loadedCheckpoints.isEmpty()) {
-                log.warn("Some loaded checkpoint offsets cannot find their 
corresponding state stores: {}", loadedCheckpoints);
-            }
-
-            if (eosEnabled) {
-                checkpointFile.delete();
-            }
-        } catch (final TaskCorruptedException e) {
-            throw e;
-        } catch (final IOException | RuntimeException e) {
-            // both IOException or runtime exception like number parsing can 
throw
-            throw new ProcessorStateException(format("%sError loading and 
deleting checkpoint file when creating the state manager",
-                logPrefix), e);
         }
+
+        stateDirectory.updateTaskOffsets(taskId, changelogOffsets());

Review Comment:
   Ahh yes. Looking at the implementation of `updateTaskOffsets`, it seems the 
only exception that would have previously been caught here is a 
`StreamsException` (wrapping an `IllegalStateException`), thrown when one of 
the offsets is negative.
   
   I actually think the old behaviour might have been a bit of a bug (wrapping 
a `StreamsException` in a `ProcessorStateException` seems weird to me). But 
this exception should never be thrown anyway, because it would require an 
offset to somehow be negative (which I think would indicate either a bug or 
data corruption).
   
   Regardless, I'm restoring the `try`/`catch` block, locally around this 
statement, but with a slightly re-worded error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to