This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 57da458  [FLINK-15010][network] Add shut down hook to ensure cleanup 
netty shuffle directories
57da458 is described below

commit 57da4589e89672bcb042c8656b269e4faace1664
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Tue Feb 4 12:04:44 2020 +0800

    [FLINK-15010][network] Add shut down hook to ensure cleanup netty shuffle 
directories
    
    When the cluster is shut down in standalone mode, the task manager is shut 
down via SIG_TERM signal.
    In this case, the shuffle directories created by FileChannelManager would 
not be removed finally.
    
    To solve this issue, we register the shut down hook before creating 
directories in the constructor of FileChannelManagerImpl.
    
    This closes #10736
---
 .../runtime/io/disk/FileChannelManagerImpl.java    |  34 ++++
 .../io/disk/FileChannelManagerImplTest.java        | 179 +++++++++++++++++++++
 2 files changed, 213 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
index 2bdb8d9..b6df3dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +32,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The manager used for creating/deleting file channels based on config temp 
dirs.
@@ -51,12 +54,30 @@ public class FileChannelManagerImpl implements 
FileChannelManager {
        /** The number of the next path to use. */
        private volatile int nextPath;
 
+       /** Prefix of the temporary directories to create. */
+       private final String prefix;
+
+       /**
+        *  Flag to signal that the file channel manager has been shutdown 
already. The flag
+        *  should support concurrent access for cases like multiple shutdown 
hooks.
+        */
+       private final AtomicBoolean isShutdown = new AtomicBoolean();
+
+       /** Shutdown hook to make sure that the directories are removed on 
exit. */
+       private final Thread shutdownHook;
+
        public FileChannelManagerImpl(String[] tempDirs, String prefix) {
                checkNotNull(tempDirs, "The temporary directories must not be 
null.");
                checkArgument(tempDirs.length > 0, "The temporary directories 
must not be empty.");
 
                this.random = new Random();
                this.nextPath = 0;
+               this.prefix = prefix;
+
+               shutdownHook = ShutdownHookUtil.addShutdownHook(this, 
String.format("%s-%s", getClass().getSimpleName(), prefix), LOG);
+
+               // Creates directories after registering shutdown hook to 
ensure the directories can be
+               // removed if required.
                this.paths = createFiles(tempDirs, prefix);
        }
 
@@ -80,17 +101,23 @@ public class FileChannelManagerImpl implements 
FileChannelManager {
 
        @Override
        public ID createChannel() {
+               checkState(!isShutdown.get(), "File channel manager has 
shutdown.");
+
                int num = getNextPathNum();
                return new ID(paths[num], num, random);
        }
 
        @Override
        public Enumerator createChannelEnumerator() {
+               checkState(!isShutdown.get(), "File channel manager has 
shutdown.");
+
                return new Enumerator(paths, random);
        }
 
        @Override
        public File[] getPaths() {
+               checkState(!isShutdown.get(), "File channel manager has 
shutdown.");
+
                return Arrays.copyOf(paths, paths.length);
        }
 
@@ -99,10 +126,17 @@ public class FileChannelManagerImpl implements 
FileChannelManager {
         */
        @Override
        public void close() throws Exception {
+               // Marks shutdown and exits if it has already shutdown.
+               if (!isShutdown.compareAndSet(false, true)) {
+                       return;
+               }
+
                IOUtils.closeAll(Arrays.stream(paths)
                        .filter(File::exists)
                        .map(FileChannelManagerImpl::getFileCloser)
                        .collect(Collectors.toList()));
+
+               ShutdownHookUtil.removeShutdownHook(shutdownHook, 
String.format("%s-%s", getClass().getSimpleName(), prefix), LOG);
        }
 
        private static AutoCloseable getFileCloser(File path) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.java
new file mode 100644
index 0000000..33f0072
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests the logic of {@link FileChannelManagerImpl}.
+ */
+public class FileChannelManagerImplTest extends TestLogger {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileChannelManagerImplTest.class);
+
+       private static final String DIR_NAME_PREFIX = "manager-test";
+
+       /**
+        * Marker file indicating the test process is ready to be killed. We 
could not simply kill the process
+        * after FileChannelManager has created temporary files since we also 
need to ensure the caller has
+        * also registered the shutdown hook if <tt>callerHasHook</tt> is true.
+        */
+       private static final String SIGNAL_FILE_FOR_KILLING = "could-kill";
+
+       private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10);
+
+       @Rule
+       public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Test
+       public void testDirectoriesCleanupOnKillWithoutCallerHook() throws 
Exception {
+               testDirectoriesCleanupOnKill(false);
+       }
+
+       @Test
+       public void testDirectoriesCleanupOnKillWithCallerHook() throws 
Exception {
+               testDirectoriesCleanupOnKill(true);
+       }
+
+       private void testDirectoriesCleanupOnKill(boolean callerHasHook) throws 
Exception {
+               assumeTrue(OperatingSystem.isLinux()
+                               || OperatingSystem.isFreeBSD()
+                               || OperatingSystem.isSolaris()
+                               || OperatingSystem.isMac());
+
+               File fileChannelDir = temporaryFolder.newFolder();
+               File signalDir = temporaryFolder.newFolder();
+               File signalFile = new File(signalDir.getAbsolutePath(), 
SIGNAL_FILE_FOR_KILLING);
+
+               FileChannelManagerTestProcess fileChannelManagerTestProcess = 
new FileChannelManagerTestProcess(
+                       callerHasHook,
+                       fileChannelDir.getAbsolutePath(),
+                       signalFile.getAbsolutePath());
+
+               try {
+                       fileChannelManagerTestProcess.startProcess();
+
+                       // Waits till the process has created temporary files 
and registered the corresponding shutdown hooks.
+                       TestJvmProcess.waitForMarkerFile(signalFile, 
TEST_TIMEOUT.toMillis());
+
+                       Process kill = Runtime.getRuntime().exec("kill " + 
fileChannelManagerTestProcess.getProcessId());
+                       kill.waitFor();
+                       assertEquals("Failed to send SIG_TERM to process", 0, 
kill.exitValue());
+
+                       Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+                       while (fileChannelManagerTestProcess.isAlive() && 
deadline.hasTimeLeft()) {
+                               Thread.sleep(100);
+                       }
+
+                       assertFalse(
+                               "The file channel manager test process does not 
terminate in time, its output is: \n" +
+                                       
fileChannelManagerTestProcess.getProcessOutput(),
+                               fileChannelManagerTestProcess.isAlive());
+
+                       // Checks if the directories are cleared.
+                       assertFalse(
+                               "The file channel manager test process does not 
remove the tmp shuffle directories after termination, " +
+                                       "its output is \n" + 
fileChannelManagerTestProcess.getProcessOutput(),
+                               fileOrDirExists(fileChannelDir, 
DIR_NAME_PREFIX));
+               } finally {
+                       fileChannelManagerTestProcess.destroy();
+               }
+       }
+
+       private boolean fileOrDirExists(File rootTmpDir, String namePattern) {
+               File[] candidates = rootTmpDir.listFiles((dir, name) -> 
name.contains(namePattern));
+               return candidates != null && candidates.length > 0;
+       }
+
+       /**
+        * The {@link FileChannelManagerCleanupRunner} instance running in a 
separate JVM process.
+        */
+       private static class FileChannelManagerTestProcess extends 
TestJvmProcess {
+               private final boolean callerHasHook;
+               private final String tmpDirectories;
+               private final String signalFilePath;
+
+               FileChannelManagerTestProcess(boolean callerHasHook, String 
tmpDirectories, String signalFilePath) throws Exception {
+                       this.callerHasHook = callerHasHook;
+                       this.tmpDirectories = tmpDirectories;
+                       this.signalFilePath = signalFilePath;
+               }
+
+               @Override
+               public String getName() {
+                       return "File Channel Manager Test";
+               }
+
+               @Override
+               public String[] getJvmArgs() {
+                       return new String[]{
+                                       Boolean.toString(callerHasHook),
+                                       tmpDirectories,
+                                       signalFilePath
+                       };
+               }
+
+               @Override
+               public String getEntryPointClassName() {
+                       return FileChannelManagerCleanupRunner.class.getName();
+               }
+       }
+
+       /**
+        * The entry point class to test the file channel manager cleanup with 
shutdown hook.
+        */
+       public static class FileChannelManagerCleanupRunner {
+
+               public static void main(String[] args) throws Exception{
+                       boolean callerHasHook = Boolean.parseBoolean(args[0]);
+                       String tmpDirectory = args[1];
+                       String signalFilePath = args[2];
+
+                       FileChannelManager manager = new 
FileChannelManagerImpl(new String[]{tmpDirectory}, DIR_NAME_PREFIX);
+
+                       if (callerHasHook) {
+                               // Verifies the case that both 
FileChannelManager and its upper component
+                               // have registered shutdown hooks, like in 
IOManager.
+                               ShutdownHookUtil.addShutdownHook(() -> 
manager.close(), "Caller", LOG);
+                       }
+
+                       // Signals the main process to execute the kill action.
+                       new File(signalFilePath).createNewFile();
+
+                       // Blocks the process to wait to be killed.
+                       Thread.sleep(3 * TEST_TIMEOUT.toMillis());
+               }
+       }
+}

Reply via email to