This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ff1a18744e8e2b96d5cc447e80df7c1dbb614c8
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Dec 30 12:25:20 2021 +0100

    [FLINK-25817] Clean up of orphaned local state
    
    This commit adds logic to clean up orphaned local state by only retaining 
those allocation
    directories that could be recovered from the 
SlotAllocationSnapshotPersistence service.
---
 .../state/TaskExecutorLocalStateStoresManager.java | 64 +++++++++++++++++++++-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  5 ++
 .../TaskExecutorLocalStateStoresManagerTest.java   | 44 +++++++++++++++
 3 files changed, 112 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index f7ffca0..22c8946 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -35,9 +35,15 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task 
executor (manager).
@@ -48,6 +54,8 @@ public class TaskExecutorLocalStateStoresManager {
     private static final Logger LOG =
             LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
 
+    public static final String ALLOCATION_DIR_PREFIX = "aid_";
+
     /**
      * This map holds all local state stores for tasks running on the task 
manager / executor that
      * own the instance of this. Maps from allocation id to all the subtask's 
local state stores.
@@ -219,6 +227,60 @@ public class TaskExecutorLocalStateStoresManager {
         cleanupAllocationBaseDirs(allocationID);
     }
 
+    /**
+     * Retains the given set of allocations. All other allocations will be 
released.
+     *
+     * @param allocationsToRetain
+     */
+    public void retainLocalStateForAllocations(Set<AllocationID> 
allocationsToRetain) {
+        final Collection<AllocationID> allocationIds = findStoredAllocations();
+
+        allocationIds.stream()
+                .filter(allocationId -> 
!allocationsToRetain.contains(allocationId))
+                .forEach(this::releaseLocalStateForAllocationId);
+    }
+
+    private Collection<AllocationID> findStoredAllocations() {
+        final Set<AllocationID> storedAllocations = new HashSet<>();
+        for (File localStateRootDirectory : localStateRootDirectories.deref()) 
{
+            try {
+                final Collection<Path> allocationDirectories =
+                        listAllocationDirectoriesIn(localStateRootDirectory);
+
+                for (Path allocationDirectory : allocationDirectories) {
+                    final String hexString =
+                            allocationDirectory
+                                    .getFileName()
+                                    .toString()
+                                    .substring(ALLOCATION_DIR_PREFIX.length());
+                    
storedAllocations.add(AllocationID.fromHexString(hexString));
+                }
+            } catch (IOException ioe) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Could not list local state directory {}. This 
entails that some orphaned local state might not be cleaned up properly.",
+                            localStateRootDirectory,
+                            ioe);
+                } else {
+                    LOG.info(
+                            "Could not list local state directory {}. This 
entails that some orphaned local state might not be cleaned up properly.",
+                            localStateRootDirectory);
+                }
+            }
+        }
+
+        return storedAllocations;
+    }
+
+    @VisibleForTesting
+    @Nonnull
+    static Collection<Path> listAllocationDirectoriesIn(File 
localStateRootDirectory)
+            throws IOException {
+        return Files.list(localStateRootDirectory.toPath())
+                .filter(path -> 
path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX))
+                .collect(Collectors.toList());
+    }
+
     public void shutdown() {
         synchronized (lock) {
             if (closed) {
@@ -259,7 +321,7 @@ public class TaskExecutorLocalStateStoresManager {
 
     @VisibleForTesting
     String allocationSubDirString(AllocationID allocationID) {
-        return "aid_" + allocationID.toHexString();
+        return ALLOCATION_DIR_PREFIX + allocationID.toHexString();
     }
 
     private File[] allocationBaseDirectories(AllocationID allocationID) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5ccb6de..7e4afb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -2049,6 +2049,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
         log.debug("Recovered slot allocation snapshots {}.", 
slotAllocationSnapshots);
 
+        final Set<AllocationID> allocatedSlots = new HashSet<>();
         for (SlotAllocationSnapshot slotAllocationSnapshot : 
slotAllocationSnapshots) {
             try {
                 allocateSlot(
@@ -2066,7 +2067,11 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             } catch (Exception e) {
                 log.debug("Cannot reallocate restored slot {}.", 
slotAllocationSnapshot, e);
             }
+
+            allocatedSlots.add(slotAllocationSnapshot.getAllocationId());
         }
+
+        localStateStoresManager.retainLocalStateForAllocations(allocatedSlots);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 0c6e95b..5fd6ddf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -36,6 +36,8 @@ import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
+
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -44,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collection;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -318,6 +322,46 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
         }
     }
 
+    @Test
+    public void 
testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories()
+            throws IOException {
+        final File localStateStore = temporaryFolder.newFolder();
+        final TaskExecutorLocalStateStoresManager 
taskExecutorLocalStateStoresManager =
+                new TaskExecutorLocalStateStoresManager(
+                        true,
+                        Reference.owned(new File[] {localStateStore}),
+                        Executors.directExecutor());
+        final JobID jobId = new JobID();
+        final AllocationID retainedAllocationId = new AllocationID();
+        final AllocationID otherAllocationId = new AllocationID();
+        final JobVertexID jobVertexId = new JobVertexID();
+
+        // register local state stores
+        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(
+                jobId, retainedAllocationId, jobVertexId, 0);
+        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(
+                jobId, otherAllocationId, jobVertexId, 1);
+
+        final Collection<Path> allocationDirectories =
+                
TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateStore);
+
+        assertThat(allocationDirectories).hasSize(2);
+
+        taskExecutorLocalStateStoresManager.retainLocalStateForAllocations(
+                Sets.newHashSet(retainedAllocationId));
+
+        final Collection<Path> allocationDirectoriesAfterCleanup =
+                
TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateStore);
+
+        assertThat(allocationDirectoriesAfterCleanup).hasSize(1);
+        assertThat(
+                        new File(
+                                localStateStore,
+                                
taskExecutorLocalStateStoresManager.allocationSubDirString(
+                                        otherAllocationId)))
+                .doesNotExist();
+    }
+
     private void checkRootDirsClean(File[] rootDirs) {
         for (File rootDir : rootDirs) {
             File[] files = rootDir.listFiles();

Reply via email to