[FLINK-5812] [core] Cleanups in FileSystem (round 2)

Move the FileSystem safety net to a separate class.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5902ea0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5902ea0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5902ea0e

Branch: refs/heads/master
Commit: 5902ea0e88c70f330c23b9ace94033ae34c84445
Parents: a1bfae9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 15 17:58:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |   2 +-
 .../org/apache/flink/core/fs/FileSystem.java    |  52 +-------
 .../flink/core/fs/FileSystemSafetyNet.java      | 124 +++++++++++++++++++
 .../flink/util/AbstractCloseableRegistry.java   |   4 -
 .../core/fs/SafetyNetCloseableRegistryTest.java |   8 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  11 +-
 6 files changed, 140 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index e9738a2..0a0d06e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -154,7 +154,7 @@ under the License.
                                        <parameter>
                                                <excludes 
combine.children="append">
                                                        
<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
-                                                       
<exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude>
+                                                       
<exclude>org.apache.flink.core.fs.FileSystem\$FSKey</exclude>
                                                        
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
                                                        <!-- Breaking changes 
between 1.1 and 1.2. 
                                                        We ignore these changes 
because these are low-level, internal runtime configuration parameters -->

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 4149d5e..fab0f4d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -17,7 +17,7 @@
  */
 
 
-/**
+/*
  * This file is based on source code from the Hadoop Project 
(http://hadoop.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership.
@@ -30,12 +30,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -174,6 +169,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * application task finishes (or is canceled or failed). That way, the task's 
threads do not
  * leak connections.
  * 
+ * <p>Internal runtime code can explicitly obtain a FileSystem that does not 
use the safety
+ * net via {@link FileSystem#getUnguardedFileSystem(URI)}.
+ * 
  * @see FSDataInputStream
  * @see FSDataOutputStream
  */
@@ -198,57 +196,18 @@ public abstract class FileSystem {
 
        // 
------------------------------------------------------------------------
 
-       private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES 
= new ThreadLocal<>();
-
        private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
        private static final String MAPR_FILESYSTEM_CLASS = 
"org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
 
        private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(FileSystem.class);
-
        /** This lock guards the methods {@link #initOutPathLocalFS(Path, 
WriteMode, boolean)} and
         * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are 
otherwise susceptible to races */
        private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new 
ReentrantLock(true);
 
        // 
------------------------------------------------------------------------
 
-       /**
-        * Create a SafetyNetCloseableRegistry for a Task. This method should 
be called at the beginning of the task's
-        * main thread.
-        */
-       @Internal
-       public static void createAndSetFileSystemCloseableRegistryForThread() {
-               SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
-               Preconditions.checkState(null == oldRegistry,
-                               "Found old CloseableRegistry " + oldRegistry +
-                                               ". This indicates a leak of the 
InheritableThreadLocal through a ThreadPool!");
-
-               SafetyNetCloseableRegistry newRegistry = new 
SafetyNetCloseableRegistry();
-               REGISTRIES.set(newRegistry);
-               LOG.info("Created new CloseableRegistry " + newRegistry + " for 
{}", Thread.currentThread().getName());
-       }
-
-       /**
-        * Create a SafetyNetCloseableRegistry for a Task. This method should 
be called at the end of the task's
-        * main thread or when the task should be canceled.
-        */
-       @Internal
-       public static void 
closeAndDisposeFileSystemCloseableRegistryForThread() {
-               SafetyNetCloseableRegistry registry = REGISTRIES.get();
-               if (null != registry) {
-                       LOG.info("Ensuring all FileSystem streams are closed 
for {}", Thread.currentThread().getName());
-                       REGISTRIES.remove();
-                       IOUtils.closeQuietly(registry);
-               }
-       }
-
-       private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) 
{
-               SafetyNetCloseableRegistry reg = REGISTRIES.get();
-               return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : 
fs;
-       }
-
        /** Object used to protect calls to specific methods.*/
        private static final Object SYNCHRONIZATION_OBJECT = new Object();
 
@@ -427,7 +386,7 @@ public abstract class FileSystem {
         *         thrown if a reference to the file system instance could not 
be obtained
         */
        public static FileSystem get(URI uri) throws IOException {
-               return 
wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
+               return 
FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
        }
 
        /**
@@ -971,7 +930,6 @@ public abstract class FileSystem {
 
        /**
         * An identifier of a file system, via its scheme and its authority.
-        * This class needs to stay public, because it is detected as part of 
the public API.
         */
        private static final class FSKey {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
new file mode 100644
index 0000000..b18cb13
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The FileSystemSafetyNet can be used to guard a thread against {@link 
FileSystem} stream resource leaks.
+ * When activated for a thread, it tracks all streams that are opened by 
FileSystems that the thread
+ * obtains. The safety net has a global cleanup hook that will close all 
streams that were
+ * not properly closed.
+ * 
+ * <p>The main thread of each Flink task, as well as the checkpointing thread 
are automatically guarded
+ * by this safety net.
+ * 
+ * <p><b>Important:</b> This safety net works only for streams created by 
Flink's FileSystem abstraction,
+ * i.e., for {@code FileSystem} instances obtained via {@link 
FileSystem#get(URI)} or through
+ * {@link Path#getFileSystem()}.
+ * 
+ * <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or 
a stream and passes them
+ * to another thread, the safety net will close those resources once the 
former thread finishes.
+ * 
+ * <p>The safety net can be used as follows:
+ * <pre>{@code
+ * 
+ * class GuardedThread extends Thread {
+ * 
+ *     public void run() {
+ *         FileSystemSafetyNet.initializeSafetyNetForThread();
+ *         try {
+ *             // do some heavy stuff where you are unsure whether it closes 
all streams
+ *             // like some untrusted user code or library code
+ *         }
+ *         finally {
+ *             
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ *         }
+ *     }
+ * }
+ * }</pre>
+ */
+@Internal
+public class FileSystemSafetyNet {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemSafetyNet.class);
+
+       /** The map from thread to the safety net registry for that thread */
+       private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES 
= new ThreadLocal<>();
+
+       // 
------------------------------------------------------------------------
+       //  Activating / Deactivating
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Activates the safety net for a thread. {@link FileSystem} instances 
obtained by the thread
+        * that called this method will be guarded, meaning that their created 
streams are tracked and can
+        * be closed via the safety net closing hook.
+        * 
+        * <p>This method should be called at the beginning of a thread that 
should be guarded.
+        * 
+        * @throws IllegalStateException Thrown, if a safety net was already 
registered for the thread.
+        */
+       @Internal
+       public static void initializeSafetyNetForThread() {
+               SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+
+               checkState(null == oldRegistry, "Found an existing FileSystem 
safety net for this thread: %s " +
+                               "This may indicate an accidental repeated 
initialization, or a leak of the" +
+                               "(Inheritable)ThreadLocal through a 
ThreadPool.", oldRegistry);
+
+               SafetyNetCloseableRegistry newRegistry = new 
SafetyNetCloseableRegistry();
+               REGISTRIES.set(newRegistry);
+               LOG.info("Created new CloseableRegistry {} for {}", 
newRegistry, Thread.currentThread().getName());
+       }
+
+       /**
+        * Closes the safety net for a thread. This closes all remaining 
unclosed streams that were opened
+        * by safety-net-guarded file systems. After this method was called, no 
streams can be opened any more
+        * from any FileSystem instance that was obtained while the thread was 
guarded by the safety net.
+        * 
+        * <p>This method should be called at the very end of a guarded thread.
+        */
+       @Internal
+       public static void closeSafetyNetAndGuardedResourcesForThread() {
+               SafetyNetCloseableRegistry registry = REGISTRIES.get();
+               if (null != registry) {
+                       LOG.info("Ensuring all FileSystem streams are closed 
for {}", Thread.currentThread().getName());
+                       REGISTRIES.remove();
+                       IOUtils.closeQuietly(registry);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
+               SafetyNetCloseableRegistry reg = REGISTRIES.get();
+               return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : 
fs;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 2b7a8c8..766ede9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -108,10 +108,6 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
                return closeableToRef;
        }
 
-       // 
------------------------------------------------------------------------
-       //  
-       // 
------------------------------------------------------------------------
-
        protected abstract void doUnRegister(C closeable, Map<Closeable, T> 
closeableMap);
 
        protected abstract void doRegister(C closeable, Map<Closeable, T> 
closeableMap) throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 6870780..7973c69 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -77,7 +77,7 @@ public class SafetyNetCloseableRegistryTest {
                                        FileSystem fs1 = 
FileSystem.getLocalFileSystem();
                                        // ensure no safety net in place
                                        Assert.assertFalse(fs1 instanceof 
SafetyNetWrapperFileSystem);
-                                       
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+                                       
FileSystemSafetyNet.initializeSafetyNetForThread();
                                        fs1 = FileSystem.getLocalFileSystem();
                                        // ensure safety net is in place now
                                        Assert.assertTrue(fs1 instanceof 
SafetyNetWrapperFileSystem);
@@ -91,11 +91,11 @@ public class SafetyNetCloseableRegistryTest {
                                                                FileSystem fs2 
= FileSystem.getLocalFileSystem();
                                                                // ensure the 
safety net does not leak here
                                                                
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
-                                                               
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+                                                               
FileSystemSafetyNet.initializeSafetyNetForThread();
                                                                fs2 = 
FileSystem.getLocalFileSystem();
                                                                // ensure we 
can bring another safety net in place
                                                                
Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
-                                                               
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+                                                               
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                                                fs2 = 
FileSystem.getLocalFileSystem();
                                                                // and that we 
can remove it again
                                                                
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
@@ -107,7 +107,7 @@ public class SafetyNetCloseableRegistryTest {
 
                                                //ensure stream is still open 
and was never closed by any interferences
                                                stream.write(42);
-                                               
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+                                               
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
                                                // ensure leaking stream was 
closed
                                                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 64a83c9..acb423b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -552,7 +552,7 @@ public class Task implements Runnable, TaskActions {
                        // ----------------------------
 
                        // activate safety net for task thread
-                       
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+                       FileSystemSafetyNet.initializeSafetyNetForThread();
 
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
@@ -789,8 +789,9 @@ public class Task implements Runnable, TaskActions {
 
                                // remove all files in the distributed cache
                                removeCachedFiles(distributedCacheEntries, 
fileCache);
+
                                // close and de-activate safety net for task 
thread
-                               
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+                               
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
                                notifyFinalState();
                        }
@@ -1131,7 +1132,7 @@ public class Task implements Runnable, TaskActions {
                                        @Override
                                        public void run() {
                                                // activate safety net for 
checkpointing thread
-                                               
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+                                               
FileSystemSafetyNet.initializeSafetyNetForThread();
                                                try {
                                                        boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData);
                                                        if (!success) {
@@ -1152,7 +1153,7 @@ public class Task implements Runnable, TaskActions {
                                                        }
                                                } finally {
                                                        // close and 
de-activate safety net for checkpointing thread
-                                                       
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+                                                       
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                                }
                                        }
                                };

Reply via email to