This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2ff1a18744e8e2b96d5cc447e80df7c1dbb614c8 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Dec 30 12:25:20 2021 +0100 [FLINK-25817] Clean up of orphaned local state This commit adds logic to clean up orphaned local state by only retaining those allocation directories that could be recovered from the SlotAllocationSnapshotPersistence service. --- .../state/TaskExecutorLocalStateStoresManager.java | 64 +++++++++++++++++++++- .../flink/runtime/taskexecutor/TaskExecutor.java | 5 ++ .../TaskExecutorLocalStateStoresManagerTest.java | 44 +++++++++++++++ 3 files changed, 112 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index f7ffca0..22c8946 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -35,9 +35,15 @@ import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task executor (manager). @@ -48,6 +54,8 @@ public class TaskExecutorLocalStateStoresManager { private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class); + public static final String ALLOCATION_DIR_PREFIX = "aid_"; + /** * This map holds all local state stores for tasks running on the task manager / executor that * own the instance of this. Maps from allocation id to all the subtask's local state stores. @@ -219,6 +227,60 @@ public class TaskExecutorLocalStateStoresManager { cleanupAllocationBaseDirs(allocationID); } + /** + * Retains the given set of allocations. All other allocations will be released. + * + * @param allocationsToRetain + */ + public void retainLocalStateForAllocations(Set<AllocationID> allocationsToRetain) { + final Collection<AllocationID> allocationIds = findStoredAllocations(); + + allocationIds.stream() + .filter(allocationId -> !allocationsToRetain.contains(allocationId)) + .forEach(this::releaseLocalStateForAllocationId); + } + + private Collection<AllocationID> findStoredAllocations() { + final Set<AllocationID> storedAllocations = new HashSet<>(); + for (File localStateRootDirectory : localStateRootDirectories.deref()) { + try { + final Collection<Path> allocationDirectories = + listAllocationDirectoriesIn(localStateRootDirectory); + + for (Path allocationDirectory : allocationDirectories) { + final String hexString = + allocationDirectory + .getFileName() + .toString() + .substring(ALLOCATION_DIR_PREFIX.length()); + storedAllocations.add(AllocationID.fromHexString(hexString)); + } + } catch (IOException ioe) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Could not list local state directory {}. This entails that some orphaned local state might not be cleaned up properly.", + localStateRootDirectory, + ioe); + } else { + LOG.info( + "Could not list local state directory {}. This entails that some orphaned local state might not be cleaned up properly.", + localStateRootDirectory); + } + } + } + + return storedAllocations; + } + + @VisibleForTesting + @Nonnull + static Collection<Path> listAllocationDirectoriesIn(File localStateRootDirectory) + throws IOException { + return Files.list(localStateRootDirectory.toPath()) + .filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)) + .collect(Collectors.toList()); + } + public void shutdown() { synchronized (lock) { if (closed) { @@ -259,7 +321,7 @@ public class TaskExecutorLocalStateStoresManager { @VisibleForTesting String allocationSubDirString(AllocationID allocationID) { - return "aid_" + allocationID.toHexString(); + return ALLOCATION_DIR_PREFIX + allocationID.toHexString(); } private File[] allocationBaseDirectories(AllocationID allocationID) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 5ccb6de..7e4afb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -2049,6 +2049,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { log.debug("Recovered slot allocation snapshots {}.", slotAllocationSnapshots); + final Set<AllocationID> allocatedSlots = new HashSet<>(); for (SlotAllocationSnapshot slotAllocationSnapshot : slotAllocationSnapshots) { try { allocateSlot( @@ -2066,7 +2067,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } catch (Exception e) { log.debug("Cannot reallocate restored slot {}.", slotAllocationSnapshot, e); } + + allocatedSlots.add(slotAllocationSnapshot.getAllocationId()); } + + localStateStoresManager.retainLocalStateForAllocations(allocatedSlots); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 0c6e95b..5fd6ddf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -36,6 +36,8 @@ import org.apache.flink.util.Reference; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; + import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -44,7 +46,9 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; import static org.assertj.core.api.Assertions.assertThat; @@ -318,6 +322,46 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { } } + @Test + public void testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories() + throws IOException { + final File localStateStore = temporaryFolder.newFolder(); + final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = + new TaskExecutorLocalStateStoresManager( + true, + Reference.owned(new File[] {localStateStore}), + Executors.directExecutor()); + final JobID jobId = new JobID(); + final AllocationID retainedAllocationId = new AllocationID(); + final AllocationID otherAllocationId = new AllocationID(); + final JobVertexID jobVertexId = new JobVertexID(); + + // register local state stores + taskExecutorLocalStateStoresManager.localStateStoreForSubtask( + jobId, retainedAllocationId, jobVertexId, 0); + taskExecutorLocalStateStoresManager.localStateStoreForSubtask( + jobId, otherAllocationId, jobVertexId, 1); + + final Collection<Path> allocationDirectories = + TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateStore); + + assertThat(allocationDirectories).hasSize(2); + + taskExecutorLocalStateStoresManager.retainLocalStateForAllocations( + Sets.newHashSet(retainedAllocationId)); + + final Collection<Path> allocationDirectoriesAfterCleanup = + TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateStore); + + assertThat(allocationDirectoriesAfterCleanup).hasSize(1); + assertThat( + new File( + localStateStore, + taskExecutorLocalStateStoresManager.allocationSubDirString( + otherAllocationId))) + .doesNotExist(); + } + private void checkRootDirsClean(File[] rootDirs) { for (File rootDir : rootDirs) { File[] files = rootDir.listFiles();