[FLINK-5812] [core] Cleanups in FileSystem (round 2) Move the FileSystem safety net to a separate class.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5902ea0e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5902ea0e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5902ea0e Branch: refs/heads/master Commit: 5902ea0e88c70f330c23b9ace94033ae34c84445 Parents: a1bfae9 Author: Stephan Ewen <se...@apache.org> Authored: Wed Feb 15 17:58:37 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 20 01:01:24 2017 +0100 ---------------------------------------------------------------------- flink-core/pom.xml | 2 +- .../org/apache/flink/core/fs/FileSystem.java | 52 +------- .../flink/core/fs/FileSystemSafetyNet.java | 124 +++++++++++++++++++ .../flink/util/AbstractCloseableRegistry.java | 4 - .../core/fs/SafetyNetCloseableRegistryTest.java | 8 +- .../apache/flink/runtime/taskmanager/Task.java | 11 +- 6 files changed, 140 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index e9738a2..0a0d06e 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -154,7 +154,7 @@ under the License. <parameter> <excludes combine.children="append"> <exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude> - <exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude> + <exclude>org.apache.flink.core.fs.FileSystem\$FSKey</exclude> <exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude> <!-- Breaking changes between 1.1 and 1.2. We ignore these changes because these are low-level, internal runtime configuration parameters --> http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 4149d5e..fab0f4d 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -17,7 +17,7 @@ */ -/** +/* * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. @@ -30,12 +30,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.util.IOUtils; import org.apache.flink.util.OperatingSystem; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.File; @@ -174,6 +169,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * application task finishes (or is canceled or failed). That way, the task's threads do not * leak connections. * + * <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety + * net via {@link FileSystem#getUnguardedFileSystem(URI)}. + * * @see FSDataInputStream * @see FSDataOutputStream */ @@ -198,57 +196,18 @@ public abstract class FileSystem { // ------------------------------------------------------------------------ - private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>(); - private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem"; private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem"; private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper"; - private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); - /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */ private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true); // ------------------------------------------------------------------------ - /** - * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's - * main thread. - */ - @Internal - public static void createAndSetFileSystemCloseableRegistryForThread() { - SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get(); - Preconditions.checkState(null == oldRegistry, - "Found old CloseableRegistry " + oldRegistry + - ". This indicates a leak of the InheritableThreadLocal through a ThreadPool!"); - - SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); - REGISTRIES.set(newRegistry); - LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName()); - } - - /** - * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's - * main thread or when the task should be canceled. - */ - @Internal - public static void closeAndDisposeFileSystemCloseableRegistryForThread() { - SafetyNetCloseableRegistry registry = REGISTRIES.get(); - if (null != registry) { - LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); - REGISTRIES.remove(); - IOUtils.closeQuietly(registry); - } - } - - private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) { - SafetyNetCloseableRegistry reg = REGISTRIES.get(); - return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs; - } - /** Object used to protect calls to specific methods.*/ private static final Object SYNCHRONIZATION_OBJECT = new Object(); @@ -427,7 +386,7 @@ public abstract class FileSystem { * thrown if a reference to the file system instance could not be obtained */ public static FileSystem get(URI uri) throws IOException { - return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); + return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); } /** @@ -971,7 +930,6 @@ public abstract class FileSystem { /** * An identifier of a file system, via its scheme and its authority. - * This class needs to stay public, because it is detected as part of the public API. */ private static final class FSKey { http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java new file mode 100644 index 0000000..b18cb13 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The FileSystemSafetyNet can be used to guard a thread against {@link FileSystem} stream resource leaks. + * When activated for a thread, it tracks all streams that are opened by FileSystems that the thread + * obtains. The safety net has a global cleanup hook that will close all streams that were + * not properly closed. + * + * <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded + * by this safety net. + * + * <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction, + * i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through + * {@link Path#getFileSystem()}. + * + * <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them + * to another thread, the safety net will close those resources once the former thread finishes. + * + * <p>The safety net can be used as follows: + * <pre>{@code + * + * class GuardedThread extends Thread { + * + * public void run() { + * FileSystemSafetyNet.initializeSafetyNetForThread(); + * try { + * // do some heavy stuff where you are unsure whether it closes all streams + * // like some untrusted user code or library code + * } + * finally { + * FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); + * } + * } + * } + * }</pre> + */ +@Internal +public class FileSystemSafetyNet { + + private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class); + + /** The map from thread to the safety net registry for that thread */ + private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>(); + + // ------------------------------------------------------------------------ + // Activating / Deactivating + // ------------------------------------------------------------------------ + + /** + * Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread + * that called this method will be guarded, meaning that their created streams are tracked and can + * be closed via the safety net closing hook. + * + * <p>This method should be called at the beginning of a thread that should be guarded. + * + * @throws IllegalStateException Thrown, if a safety net was already registered for the thread. + */ + @Internal + public static void initializeSafetyNetForThread() { + SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get(); + + checkState(null == oldRegistry, "Found an existing FileSystem safety net for this thread: %s " + + "This may indicate an accidental repeated initialization, or a leak of the" + + "(Inheritable)ThreadLocal through a ThreadPool.", oldRegistry); + + SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); + REGISTRIES.set(newRegistry); + LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName()); + } + + /** + * Closes the safety net for a thread. This closes all remaining unclosed streams that were opened + * by safety-net-guarded file systems. After this method was called, no streams can be opened any more + * from any FileSystem instance that was obtained while the thread was guarded by the safety net. + * + * <p>This method should be called at the very end of a guarded thread. + */ + @Internal + public static void closeSafetyNetAndGuardedResourcesForThread() { + SafetyNetCloseableRegistry registry = REGISTRIES.get(); + if (null != registry) { + LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); + REGISTRIES.remove(); + IOUtils.closeQuietly(registry); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) { + SafetyNetCloseableRegistry reg = REGISTRIES.get(); + return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index 2b7a8c8..766ede9 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -108,10 +108,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen return closeableToRef; } - // ------------------------------------------------------------------------ - // - // ------------------------------------------------------------------------ - protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap); protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java index 6870780..7973c69 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java @@ -77,7 +77,7 @@ public class SafetyNetCloseableRegistryTest { FileSystem fs1 = FileSystem.getLocalFileSystem(); // ensure no safety net in place Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem); - FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.initializeSafetyNetForThread(); fs1 = FileSystem.getLocalFileSystem(); // ensure safety net is in place now Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem); @@ -91,11 +91,11 @@ public class SafetyNetCloseableRegistryTest { FileSystem fs2 = FileSystem.getLocalFileSystem(); // ensure the safety net does not leak here Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem); - FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.initializeSafetyNetForThread(); fs2 = FileSystem.getLocalFileSystem(); // ensure we can bring another safety net in place Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem); - FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); fs2 = FileSystem.getLocalFileSystem(); // and that we can remove it again Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem); @@ -107,7 +107,7 @@ public class SafetyNetCloseableRegistryTest { //ensure stream is still open and was never closed by any interferences stream.write(42); - FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); // ensure leaking stream was closed try { http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 64a83c9..acb423b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; @@ -552,7 +552,7 @@ public class Task implements Runnable, TaskActions { // ---------------------------- // activate safety net for task thread - FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.initializeSafetyNetForThread(); // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes @@ -789,8 +789,9 @@ public class Task implements Runnable, TaskActions { // remove all files in the distributed cache removeCachedFiles(distributedCacheEntries, fileCache); + // close and de-activate safety net for task thread - FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); notifyFinalState(); } @@ -1131,7 +1132,7 @@ public class Task implements Runnable, TaskActions { @Override public void run() { // activate safety net for checkpointing thread - FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.initializeSafetyNetForThread(); try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData); if (!success) { @@ -1152,7 +1153,7 @@ public class Task implements Runnable, TaskActions { } } finally { // close and de-activate safety net for checkpointing thread - FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } };