ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452491173
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -104,26 +104,27 @@ static void closeStateManager(final Logger log,
if (stateDirectory.lock(id)) {
try {
stateMgr.close();
-
- if (wipeStateStore) {
- log.debug("Wiping state stores for {} task {}",
taskType, id);
- // we can just delete the whole dir of the task,
including the state store images and the checkpoint files,
- // and then we write an empty checkpoint file
indicating that the previous close is graceful and we just
- // need to re-bootstrap the restoration from the
beginning
- Utils.delete(stateMgr.baseDir());
- }
} catch (final ProcessorStateException e) {
firstException.compareAndSet(null, e);
} finally {
- stateDirectory.unlock(id);
+ try {
+ if (wipeStateStore) {
+ log.debug("Wiping state stores for {} task {}",
taskType, id);
+ // we can just delete the whole dir of the task,
including the state store images and the checkpoint files,
+ // and then we write an empty checkpoint file
indicating that the previous close is graceful and we just
+ // need to re-bootstrap the restoration from the
beginning
+ Utils.delete(stateMgr.baseDir());
+ }
+ } finally {
+ stateDirectory.unlock(id);
+ }
Review comment:
I can't use `ExceptionUtils#executeAll` because the compiler complains
that we don't handle the `IOException` unless we surround each Runnable with
its own try-catch block, at which point `#executeAll` isn't really doing
anything
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]