This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7b141b4b8767c9b7c8c72ef21055fd65908e848
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Fri Mar 22 14:57:09 2019 +0100

    [FLINK-11952][3/4] Integrate plugin mechanism with FileSystem
---
 .../java/org/apache/flink/core/fs/FileSystem.java  | 109 ++++++++++++++-------
 1 file changed, 76 insertions(+), 33 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index e7a3765..d159e70 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.fs.local.LocalFileSystemFactory;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -44,12 +45,14 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ServiceLoader;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -217,11 +220,8 @@ public abstract class FileSystem {
        /** Cache for file systems, by scheme + authority. */
        private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
 
-       /** All available file system factories. */
-       private static final List<FileSystemFactory> RAW_FACTORIES = 
loadFileSystems();
-
        /** Mapping of file system schemes to the corresponding factories,
-        * populated in {@link FileSystem#initialize(Configuration)}. */
+        * populated in {@link FileSystem#initialize(Configuration, 
PluginManager)}. */
        private static final HashMap<String, FileSystemFactory> FS_FACTORIES = 
new HashMap<>();
 
        /** The default factory that is used when no scheme matches. */
@@ -249,17 +249,57 @@ public abstract class FileSystem {
         * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
         * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
         *
+        * @deprecated use {@link #initialize(Configuration, PluginManager)} 
instead.
+        *
         * @param config the configuration from where to fetch the parameter.
         */
+       @Deprecated
        public static void initialize(Configuration config) throws 
IllegalConfigurationException {
+               initializeWithoutPlugins(config);
+       }
+
+       private static void initializeWithoutPlugins(Configuration config) 
throws IllegalConfigurationException {
+               initialize(config, null);
+       }
+
+       /**
+        * Initializes the shared file system settings.
+        *
+        * <p>The given configuration is passed to each file system factory to 
initialize the respective
+        * file systems. Because the configuration of file systems may be 
different subsequent to the call
+        * of this method, this method clears the file system instance cache.
+        *
+        * <p>This method also reads the default file system URI from the 
configuration key
+        * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link 
FileSystem#get(URI)} where
+        * the URI has no scheme will be interpreted as relative to that URI.
+        * As an example, assume the default file system URI is set to {@code 
'hdfs://localhost:9000/'}.
+        * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
+        * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
+        *
+        * @param config the configuration from where to fetch the parameter.
+        * @param pluginManager optional plugin manager that is used to 
initialized filesystems provided as plugins.
+        */
+       public static void initialize(
+               Configuration config,
+               PluginManager pluginManager) throws 
IllegalConfigurationException {
+
                LOCK.lock();
                try {
                        // make sure file systems are re-instantiated after 
re-configuration
                        CACHE.clear();
                        FS_FACTORIES.clear();
 
+                       Collection<Supplier<Iterator<FileSystemFactory>>> 
factorySuppliers = new ArrayList<>(2);
+                       factorySuppliers.add(() -> 
ServiceLoader.load(FileSystemFactory.class).iterator());
+
+                       if (pluginManager != null) {
+                               factorySuppliers.add(() -> 
pluginManager.load(FileSystemFactory.class));
+                       }
+
+                       final List<FileSystemFactory> fileSystemFactories = 
loadFileSystemFactories(factorySuppliers);
+
                        // configure all file system factories
-                       for (FileSystemFactory factory : RAW_FACTORIES) {
+                       for (FileSystemFactory factory : fileSystemFactories) {
                                factory.configure(config);
                                String scheme = factory.getScheme();
 
@@ -384,7 +424,7 @@ public abstract class FileSystem {
                        // even when not configured with an explicit Flink 
configuration, like on
                        // JobManager or TaskManager setup
                        if (FS_FACTORIES.isEmpty()) {
-                               initialize(new Configuration());
+                               initializeWithoutPlugins(new Configuration());
                        }
 
                        // Try to create a new file system
@@ -944,7 +984,9 @@ public abstract class FileSystem {
         *
         * @return A map from the file system scheme to corresponding file 
system factory.
         */
-       private static List<FileSystemFactory> loadFileSystems() {
+       private static List<FileSystemFactory> loadFileSystemFactories(
+               Collection<Supplier<Iterator<FileSystemFactory>>> 
factoryIteratorsSuppliers) {
+
                final ArrayList<FileSystemFactory> list = new ArrayList<>();
 
                // by default, we always have the local file system factory
@@ -952,38 +994,39 @@ public abstract class FileSystem {
 
                LOG.debug("Loading extension file systems via services");
 
-               try {
-                       ServiceLoader<FileSystemFactory> serviceLoader = 
ServiceLoader.load(FileSystemFactory.class);
-                       Iterator<FileSystemFactory> iter = 
serviceLoader.iterator();
-
-                       // we explicitly use an iterator here (rather than 
for-each) because that way
-                       // we can catch errors in individual service 
instantiations
-
-                       //noinspection WhileLoopReplaceableByForEach
-                       while (iter.hasNext()) {
-                               try {
-                                       FileSystemFactory factory = iter.next();
-                                       list.add(factory);
-                                       LOG.debug("Added file system {}:{}", 
factory.getScheme(), factory.getClass().getName());
-                               }
-                               catch (Throwable t) {
-                                       // catching Throwable here to handle 
various forms of class loading
-                                       // and initialization errors
-                                       
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-                                       LOG.error("Failed to load a file system 
via services", t);
-                               }
+               for (Supplier<Iterator<FileSystemFactory>> 
factoryIteratorsSupplier : factoryIteratorsSuppliers) {
+                       try {
+                               
addAllFactoriesToList(factoryIteratorsSupplier.get(), list);
+                       } catch (Throwable t) {
+                               // catching Throwable here to handle various 
forms of class loading
+                               // and initialization errors
+                               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                               LOG.error("Failed to load additional file 
systems via services", t);
                        }
                }
-               catch (Throwable t) {
-                       // catching Throwable here to handle various forms of 
class loading
-                       // and initialization errors
-                       ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-                       LOG.error("Failed to load additional file systems via 
services", t);
-               }
 
                return Collections.unmodifiableList(list);
        }
 
+       private static void addAllFactoriesToList(Iterator<FileSystemFactory> 
iter, List<FileSystemFactory> list) {
+               // we explicitly use an iterator here (rather than for-each) 
because that way
+               // we can catch errors in individual service instantiations
+
+               while (iter.hasNext()) {
+                       try {
+                               FileSystemFactory factory = iter.next();
+                               list.add(factory);
+                               LOG.debug("Added file system {}:{}", 
factory.getScheme(), factory.getClass().getName());
+                       }
+                       catch (Throwable t) {
+                               // catching Throwable here to handle various 
forms of class loading
+                               // and initialization errors
+                               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                               LOG.error("Failed to load a file system via 
services", t);
+                       }
+               }
+       }
+
        /**
         * Utility loader for the Hadoop file system factory.
         * We treat the Hadoop FS factory in a special way, because we use it 
as a catch

Reply via email to