ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452370517



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -104,17 +104,16 @@ static void closeStateManager(final Logger log,
             if (stateDirectory.lock(id)) {
                 try {
                     stateMgr.close();
-
+                } catch (final ProcessorStateException e) {
+                    firstException.compareAndSet(null, e);
+                } finally {
                     if (wipeStateStore) {
                         log.debug("Wiping state stores for {} task {}", 
taskType, id);
                         // we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
                         // and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
                         // need to re-bootstrap the restoration from the 
beginning
                         Utils.delete(stateMgr.baseDir());

Review comment:
       Right, it's not a correctness issue but it's additional needless 
overhead to go through the whole cycle of initializing a task, getting a 
TaskCorrupted, wiping it then, and finally restarting it. Of course if we keep 
hitting an issue during `closeDirty` then we might never wipe the state, which 
does seem like a real problem. For example if there's some issue with the 
state, like the files are actually corrupted or something




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to