IGNITE-2206: intermediate saving commit.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89690022
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89690022
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89690022

Branch: refs/heads/ignite-2206
Commit: 89690022863a32fb00e76335c02c6a36bfae55e1
Parents: ba11b3a
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Tue Dec 22 22:51:07 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Tue Dec 22 22:51:07 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopPayloadAware.java   |  12 ++
 .../processors/hadoop/PayloadAware.java         |  13 --
 .../internal/processors/igfs/IgfsImpl.java      |  12 +-
 .../internal/processors/igfs/IgfsPaths.java     |  37 +---
 .../hadoop/fs/HadoopFileSystemFactory.java      |  29 ---
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |  19 +-
 .../fs/v1/DefaultHadoopFileSystemFactory.java   | 185 ++++++++++++++++++
 .../hadoop/fs/v1/HadoopFileSystemFactory.java   |  21 +++
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  27 ++-
 .../fs/v2/HadoopAbstractFileSystemFactory.java  |  21 +++
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   7 +-
 .../fs/DefaultHadoopFileSystemFactory.java      | 187 -------------------
 .../hadoop/fs/HadoopLazyConcurrentMap.java      |   4 +
 .../IgniteHadoopFileSystemAbstractSelfTest.java |  12 +-
 ...teHadoopFileSystemShmemAbstractSelfTest.java |   2 -
 15 files changed, 284 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
new file mode 100644
index 0000000..dcb163f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
@@ -0,0 +1,12 @@
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ *
+ */
+public interface HadoopPayloadAware {
+    /**
+     *
+     * @return
+     */
+    public Object getPayload();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java
deleted file mode 100644
index dc4ff5e..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.ignite.internal.processors.hadoop;
-
-/**
- * Created by ivan on 22.12.15.
- */
-public interface PayloadAware <P> {
-
-    /**
-     *
-     * @return
-     */
-    public P getPayload();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index fb93ea1..7453e15 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -73,7 +72,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.hadoop.PayloadAware;
+import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -125,7 +124,7 @@ public final class IgfsImpl implements IgfsEx {
     static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, 
PERMISSION_DFLT_VAL);
 
     /** Handshake message. */
-    private final IgfsPaths<Serializable> secondaryPaths;
+    private final IgfsPaths secondaryPaths;
 
     /** Cache based structure (meta data) manager. */
     private IgfsMetaManager meta;
@@ -260,11 +259,10 @@ public final class IgfsImpl implements IgfsEx {
 
         modeRslvr = new IgfsModeResolver(dfltMode, modes);
 
-        Serializable secondaryFsPayload = null;
+        Object secondaryFsPayload = null;
 
-        if (secondaryFs instanceof PayloadAware) {
-            secondaryFsPayload = ((PayloadAware<Serializable>) 
secondaryFs).getPayload();
-        }
+        if (secondaryFs instanceof HadoopPayloadAware)
+            secondaryFsPayload = ((HadoopPayloadAware) 
secondaryFs).getPayload();
 
         secondaryPaths = new IgfsPaths(
             //secondaryFs == null ? null : secondaryFs.properties(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index d434d01..83451db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -25,12 +25,10 @@ import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.PayloadAware;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -38,16 +36,12 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Description of path modes.
  */
-public class IgfsPaths <P extends Serializable> implements Externalizable, 
PayloadAware<P> {
+public class IgfsPaths implements Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
-//    /** Additional secondary file system properties. */
-//    @Deprecated
-//    private Map<String, String> props;
-
     /** */
-    private P payload;
+    private Object payload;
 
     /** Default IGFS mode. */
     private IgfsMode dfltMode;
@@ -68,26 +62,14 @@ public class IgfsPaths <P extends Serializable> implements 
Externalizable, Paylo
      * @param dfltMode Default IGFS mode.
      * @param pathModes Path modes.
      */
-    public IgfsPaths(//Map<String, String> props,
-                     P payload,
+    public IgfsPaths(Object payload,
                      IgfsMode dfltMode,
                      @Nullable List<T2<IgfsPath, IgfsMode>> pathModes) {
-        //this.props = props;
         this.payload = payload;
         this.dfltMode = dfltMode;
         this.pathModes = pathModes;
     }
 
-//    /**
-//     * @return Secondary file system properties.
-//     *
-//     * @deprecated
-//     */
-//    @Deprecated
-//    public Map<String, String> properties() {
-//        return props;
-//    }
-
     /**
      * @return Default IGFS mode.
      */
@@ -102,15 +84,6 @@ public class IgfsPaths <P extends Serializable> implements 
Externalizable, Paylo
         return pathModes;
     }
 
-//    /**
-//     * Getter for factory.
-//     *
-//     * @return The factory.
-//     */
-//    public HadoopFileSystemFactory<F> factory() {
-//        return factory;
-//    }
-
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
 //        U.writeStringMap(out, props);
@@ -188,14 +161,14 @@ public class IgfsPaths <P extends Serializable> 
implements Externalizable, Paylo
         ObjectInput oi = new ObjectInputStream(new 
ByteArrayInputStream(factoryBytes));
 
         try {
-            payload = (P) oi.readObject();
+            payload = oi.readObject();
         }
         finally {
             oi.close();
         }
     }
 
-    @Override public P getPayload() {
+    public Object getPayload() {
         return payload;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
deleted file mode 100644
index 5337f12..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.ignite.hadoop.fs;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * This factory is {@link Serializable} because it should be transferable over 
the network.
- *
- * @param <T> The type
- */
-public interface HadoopFileSystemFactory <T> extends Serializable {
-    /**
-     * Gets the file system, possibly creating it or taking a cached instance.
-     * All the other data needed for the file system creation are expected to 
be contained
-     * in this object instance.
-     *
-     * @param userName The user name
-     * @return The file system.
-     * @throws IOException On error.
-     */
-    public T get(String userName) throws IOException;
-
-//    /**
-//     * Getter for the file system URI.
-//     *
-//     * @return The file system URI.
-//     */
-//    public URI uri();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 7ba136b..86ed7a0 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.HadoopFileSystemFactory;
 import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
@@ -45,8 +46,8 @@ import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.igfs.IgfsUserContext;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import 
org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.hadoop.PayloadAware;
-import 
org.apache.ignite.internal.processors.hadoop.fs.DefaultHadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
+import org.apache.ignite.hadoop.fs.v1.DefaultHadoopFileSystemFactory;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
 import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
@@ -69,7 +70,7 @@ import static 
org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
  * see {@link IgfsUserContext#currentUser()}.
  */
 public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSystem,
-        LifecycleAware, PayloadAware<HadoopFileSystemFactory<FileSystem>> {
+        LifecycleAware, HadoopPayloadAware {
 //    /** Properties of file system, see {@link #properties()}
 //     * */
 //    private final Map<String, String> props = new HashMap<>();
@@ -97,7 +98,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
     private String dfltUserName = IgfsUtils.fixUserName(null);
 
     /** */
-    private HadoopFileSystemFactory<FileSystem> fsFactory;
+    private HadoopFileSystemFactory fsFactory;
 
     private final AtomicBoolean started = new AtomicBoolean();
 
@@ -174,20 +175,18 @@ public class IgniteHadoopIgfsSecondaryFileSystem 
implements IgfsSecondaryFileSys
             fac.setUri(uri);
 
         if (cfgPath != null)
-            fac.setCfgPaths(Collections.singletonList(cfgPath));
+            fac.setConfigPaths(Collections.singletonList(cfgPath));
 
         setFsFactory(fac);
 
         setDfltUserName(userName);
-
-        start();
     }
 
     /**
      *
      * @param factory
      */
-    public void setFsFactory(HadoopFileSystemFactory<FileSystem> factory) {
+    public void setFsFactory(HadoopFileSystemFactory factory) {
         A.ensure(factory != null, "Factory value must not be null.");
 
         this.fsFactory = factory;
@@ -600,7 +599,6 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
 
     /**
      * Should be invoked by client (from Spring?) after all the setters 
invoked.
-     * TODO: how this should be invoked?
      *
      * @throws IgniteCheckedException
      */
@@ -609,6 +607,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
         A.ensure(fsFactory != null, "factory");
         A.ensure(dfltUserName != null, "dfltUserName");
 
+        // Avoid
         if (started.compareAndSet(false, true)) {
             if (fsFactory instanceof LifecycleAware)
                 ((LifecycleAware) fsFactory).start();
@@ -638,7 +637,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSys
         close();
     }
 
-    @Override public HadoopFileSystemFactory<FileSystem> getPayload() {
+    @Override public HadoopFileSystemFactory getPayload() {
         return fsFactory;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
new file mode 100644
index 0000000..ba4bfdd
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
@@ -0,0 +1,185 @@
+package org.apache.ignite.hadoop.fs.v1;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.igfs.IgfsPaths;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import static org.apache.ignite.internal.util.lang.GridFunc.nullifyEmpty;
+
+/**
+ * The class is to be instantiated as a Spring beans, so it must have public 
zero-arg constructor.
+ * The class is serializable as it will be transferred over the network as a 
part of {@link IgfsPaths} object.
+ */
+public class DefaultHadoopFileSystemFactory implements 
HadoopFileSystemFactory, Externalizable, LifecycleAware {
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in 
#close() method. */
+    private final transient HadoopLazyConcurrentMap<String, FileSystem> 
fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
+            @Override public FileSystem createValue(String key) {
+                try {
+                    assert !F.isEmpty(key);
+
+                    return createFileSystem(key);
+                }
+                catch (IOException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
+    /** Configuration of the secondary filesystem, never null. */
+    protected transient Configuration cfg;
+
+    /** */
+    protected transient URI uri;
+
+    /** */
+    protected String uriStr;
+
+    /** */
+    protected List<String> cfgPathStr;
+
+    int getCount = 0;
+
+    /**
+     *
+     */
+    public DefaultHadoopFileSystemFactory() {
+        //
+
+
+
+    }
+
+    @Override public FileSystem get(String userName) throws IOException {
+        A.ensure(cfg != null, "cfg");
+
+        if (getCount == 0)
+            assert fileSysLazyMap.size() == 0;
+
+        getCount++;
+
+        return fileSysLazyMap.getOrCreate(userName);
+    }
+
+    /**
+     * Uri setter.
+     * @param uriStr
+     */
+    public void setUri(String uriStr) {
+        this.uriStr = uriStr;
+    }
+
+    /**
+     * Configuration(s) setter, to be invoked from Spring config.
+     * @param cfgPaths
+     */
+    public void setConfigPaths(List<String> cfgPaths) {
+        this.cfgPathStr = (List)nullifyEmpty(cfgPaths);
+    }
+
+    /**
+     * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this 
secondary Fs.
+     * @throws IOException
+     */
+    protected FileSystem createFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(nullifyEmpty(userName));
+
+        assert cfg != null;
+
+        final FileSystem fileSys;
+
+        try {
+            fileSys = FileSystem.get(uri, cfg, userName);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException("Failed to create file system due to 
interrupt.", e);
+        }
+
+        return fileSys;
+    }
+
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, uriStr);
+
+        U.writeCollection(out, cfgPathStr);
+    }
+
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        uriStr = U.readString(in);
+
+        cfgPathStr = new ArrayList(U.readCollection(in));
+    }
+
+    @Override public void start() throws IgniteException {
+        cfg = HadoopUtils.safeCreateConfiguration();
+
+        if (cfgPathStr != null) {
+            for (String confPath : cfgPathStr) {
+                confPath = nullifyEmpty(confPath);
+
+                if (confPath != null) {
+                    URL url = U.resolveIgniteUrl(confPath);
+
+                    if (url == null) {
+                        // If secConfPath is given, it should be resolvable:
+                        throw new IllegalArgumentException("Failed to resolve 
secondary file system configuration path " +
+
+                            "(ensure that it exists locally and you have read 
access to it): " + confPath);
+                    }
+
+                    cfg.addResource(url);
+                }
+            }
+        }
+
+        // if secondary fs URI is not given explicitly, try to get it from the 
configuration:
+        if (uriStr == null)
+            uri = FileSystem.getDefaultUri(cfg);
+        else {
+            try {
+                uri = new URI(uriStr);
+            }
+            catch (URISyntaxException use) {
+                throw new IgniteException("Failed to resolve secondary file 
system URI: " + uriStr);
+            }
+        }
+
+        assert uriStr != null;
+
+        // Disable caching:
+        String prop = 
HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
+
+        cfg.setBoolean(prop, true);
+    }
+
+    @Override public void stop() throws IgniteException {
+        try {
+            fileSysLazyMap.close();
+        }
+        catch (IgniteCheckedException ice) {
+            throw new IgniteException(ice);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
new file mode 100644
index 0000000..c1c7b9d
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
@@ -0,0 +1,21 @@
+package org.apache.ignite.hadoop.fs.v1;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * This factory is {@link Serializable} because it should be transferable over 
the network.
+ */
+public interface HadoopFileSystemFactory extends Serializable {
+    /**
+     * Gets the file system, possibly creating it or taking a cached instance.
+     * All the other data needed for the file system creation are expected to 
be contained
+     * in this object instance.
+     *
+     * @param userName The user name
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public FileSystem get(String userName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 932e326..355892e 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -18,12 +18,9 @@
 package org.apache.ignite.hadoop.fs.v1;
 
 import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -47,7 +44,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
@@ -71,6 +67,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
@@ -190,7 +187,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** {@inheritDoc} */
     @Override public URI getUri() {
         if (uri == null)
-            throw new IllegalStateException("URI is null (was 
IgniteHadoopFileSystem properly initialized?).");
+            throw new IllegalStateException("URI is null (was 
IgniteHadoopFileSystem properly initialized?) [closed="
+                + closeGuard.get() + ']');
 
         return uri;
     }
@@ -243,6 +241,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public void initialize(URI name, Configuration cfg) throws 
IOException {
         enterBusy();
 
+        assert !closeGuard.get();
+
         try {
             if (rmtClient != null)
                 throw new IOException("File system is already initialized: " + 
rmtClient);
@@ -295,7 +295,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             igfsGrpBlockSize = handshake.blockSize();
 
-            final IgfsPaths<HadoopFileSystemFactory<FileSystem>> paths = 
handshake.secondaryPaths();
+            final IgfsPaths paths = handshake.secondaryPaths();
 
             // Initialize client logger.
             Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, 
uriAuthority, false);
@@ -336,16 +336,15 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
 //                byte[] secFsFacoryBytes = 
handshake.getSecondaryFileSystemFactoryBytes();
 
-                HadoopFileSystemFactory<FileSystem> factory = 
paths.getPayload();
+                HadoopFileSystemFactory factory = 
(HadoopFileSystemFactory)paths.getPayload();
 
                 A.ensure(factory != null, "Secondary file system factory 
should not be null.");
 
-                //secondaryUri = factory.uri();
+                if (factory instanceof LifecycleAware)
+                    ((LifecycleAware) factory).start();
 
                 try {
-                    //SecondaryFileSystemProvider secProvider = new 
SecondaryFileSystemProvider(secUri, secConfPath);
-
-                    secondaryFs = factory.get(user); 
//secProvider.createFileSystem(user);
+                    secondaryFs = factory.get(user);
 
                     secondaryUri = secondaryFs.getUri();
 
@@ -440,12 +439,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
         if (clientLog.isLogEnabled())
             clientLog.close();
 
-        if (secondaryFs != null)
-            U.closeQuiet(secondaryFs);
+        U.closeQuiet(secondaryFs);
+
+        System.out.println("closed " + uri);
 
         // Reset initialized resources.
         uri = null;
-        System.out.println("uri zeroed.");
         rmtClient = null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java
new file mode 100644
index 0000000..cf81e57
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java
@@ -0,0 +1,21 @@
+package org.apache.ignite.hadoop.fs.v2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.fs.AbstractFileSystem;
+
+/**
+ * This factory is {@link Serializable} because it should be transferable over 
the network.
+ */
+interface HadoopAbstractFileSystemFactory extends Serializable {
+    /**
+     * Gets the file system, possibly creating it or taking a cached instance.
+     * All the other data needed for the file system creation are expected to 
be contained
+     * in this object instance.
+     *
+     * @param userName The user name
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public AbstractFileSystem get(String userName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index d3267c7..96f97dc 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsMode;
@@ -300,7 +299,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
             grpBlockSize = handshake.blockSize();
 
-            IgfsPaths<HadoopFileSystemFactory<AbstractFileSystem>> paths = 
handshake.secondaryPaths();
+            IgfsPaths paths = handshake.secondaryPaths();
 
             Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, 
uriAuthority, false);
 
@@ -338,8 +337,8 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 //                String secUri = props.get(SECONDARY_FS_URI);
 //                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
 
-                HadoopFileSystemFactory<AbstractFileSystem> factory
-                    = 
(HadoopFileSystemFactory<AbstractFileSystem>)paths.getPayload();
+                HadoopAbstractFileSystemFactory factory
+                    = (HadoopAbstractFileSystemFactory)paths.getPayload();
 
                 A.ensure(secondaryUri != null, "File system factory uri should 
not be null.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java
deleted file mode 100644
index bee0f25..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsPaths;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-
-import static org.apache.ignite.internal.util.lang.GridFunc.nullifyEmpty;
-
-/**
- * The class is to be instantiated as a Spring beans, so it must have public 
zero-arg constructor.
- * The class is serializable as it will be transferred over the network as a 
part of {@link IgfsPaths} object.
- */
-public class DefaultHadoopFileSystemFactory implements 
HadoopFileSystemFactory<FileSystem>, Externalizable, LifecycleAware {
-    /** Configuration of the secondary filesystem, never null. */
-    protected final Configuration cfg = HadoopUtils.safeCreateConfiguration();
-
-    /** */
-    private URI uri;
-
-    /** Lazy per-user cache for the file systems. It is cleared and nulled in 
#close() method. */
-    private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = 
new HadoopLazyConcurrentMap<>(
-        new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
-            @Override public FileSystem createValue(String key) {
-                try {
-                    assert !F.isEmpty(key);
-
-                    return createFileSystem(key);
-                }
-                catch (IOException ioe) {
-                    throw new IgniteException(ioe);
-                }
-            }
-        }
-    );
-
-    public DefaultHadoopFileSystemFactory() {
-        //
-    }
-
-    @Override public FileSystem get(String userName) throws IOException {
-        return fileSysLazyMap.getOrCreate(userName);
-    }
-
-    public void setUri(URI uri) {
-        this.uri = uri;
-    }
-
-    /**
-     * Convenience mathod, analog of {@link #setUri(URI)} with String type 
argument.
-     * @param uriStr
-     */
-    public void setUri(String uriStr) {
-        try {
-            setUri(new URI(uriStr));
-        }
-        catch (URISyntaxException use) {
-            throw new IgniteException(use);
-        }
-    }
-
-    /**
-     * Configuration(s) setter, to be invoked from Spring config.
-     * @param cfgPaths
-     */
-    public void setCfgPaths(Collection<String> cfgPaths) {
-        cfgPaths = nullifyEmpty(cfgPaths);
-
-        if (cfgPaths == null)
-            return;
-
-        for (String confPath: cfgPaths) {
-            confPath = nullifyEmpty(confPath);
-
-            if (confPath != null) {
-                URL url = U.resolveIgniteUrl(confPath);
-
-                if (url == null) {
-                    // If secConfPath is given, it should be resolvable:
-                    throw new IllegalArgumentException("Failed to resolve 
secondary file system configuration path " +
-                        "(ensure that it exists locally and you have read 
access to it): " + confPath);
-                }
-
-                cfg.addResource(url);
-            }
-        }
-    }
-
-    protected void init() throws IOException {
-        String secUri = nullifyEmpty(uri == null ? null : uri.toString());
-
-        A.ensure(cfg != null, "config");
-
-        // if secondary fs URI is not given explicitly, try to get it from the 
configuration:
-        if (secUri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-        else {
-            try {
-                uri = new URI(secUri);
-            }
-            catch (URISyntaxException use) {
-                throw new IOException("Failed to resolve secondary file system 
URI: " + secUri);
-            }
-        }
-
-        // Disable caching:
-        String prop = 
HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
-
-        cfg.setBoolean(prop, true);
-    }
-
-    /**
-     * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this 
secondary Fs.
-     * @throws IOException
-     */
-    protected FileSystem createFileSystem(String userName) throws IOException {
-        userName = IgfsUtils.fixUserName(nullifyEmpty(userName));
-
-        final FileSystem fileSys;
-
-        try {
-            fileSys = FileSystem.get(uri, cfg, userName);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException("Failed to create file system due to 
interrupt.", e);
-        }
-
-        return fileSys;
-    }
-
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        cfg.write(out);
-
-        U.writeString(out, uri.toString());
-    }
-
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        cfg.clear();
-
-        cfg.readFields(in);
-
-        String uriStr = U.readString(in);
-
-        try {
-            uri = new URI(uriStr);
-        }
-        catch (URISyntaxException use) {
-            throw new IOException(use);
-        }
-    }
-
-    @Override public void start() throws IgniteException {
-        try {
-            init();
-        }
-        catch (IOException ice) {
-            throw new IgniteException(ice);
-        }
-    }
-
-    @Override public void stop() throws IgniteException {
-        try {
-            fileSysLazyMap.close();
-        }
-        catch (IgniteCheckedException ice) {
-            throw new IgniteException(ice);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index 89eaf73..58b5120 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -57,6 +57,10 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> 
{
         assert getClass().getClassLoader() == Ignite.class.getClassLoader();
     }
 
+    public int size () {
+        return map.size();
+    }
+
     /**
      * Gets cached or creates a new value of V.
      * Never returns null.

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index 7e5ef39..1ce0492 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayDeque;
 import java.util.Arrays;
@@ -63,7 +62,7 @@ import 
org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
 import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import 
org.apache.ignite.internal.processors.hadoop.fs.DefaultHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.v1.DefaultHadoopFileSystemFactory;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsIpcIo;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutProc;
@@ -385,16 +384,16 @@ public abstract class 
IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         if (mode != PRIMARY) {
             DefaultHadoopFileSystemFactory fac = new 
DefaultHadoopFileSystemFactory();
+
             fac.setUri(SECONDARY_URI);
-            fac.setCfgPaths(Collections.singletonList(SECONDARY_CFG_PATH));
+            fac.setConfigPaths(Collections.singletonList(SECONDARY_CFG_PATH));
 
             IgniteHadoopIgfsSecondaryFileSystem sec = new 
IgniteHadoopIgfsSecondaryFileSystem();
 
             sec.setFsFactory(fac);
             sec.setDfltUserName(SECONDARY_FS_USER);
 
-            sec.start();
-
+            // NB: start() will be invoked upon IgfsImpl init.
             cfg.setSecondaryFileSystem(sec);
         }
 
@@ -412,7 +411,8 @@ public abstract class 
IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
             @Override public Object call() throws Exception {
                 return new IgniteHadoopFileSystem().getUri();
             }
-        }, IllegalStateException.class, "URI is null (was 
IgniteHadoopFileSystem properly initialized?).");
+        }, IllegalStateException.class,
+            "URI is null (was IgniteHadoopFileSystem properly initialized?) 
[closed=false]");
     }
 
     /** @throws Exception If failed. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
index 20c2bd2..d8cf74c 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
@@ -60,8 +60,6 @@ public abstract class 
IgniteHadoopFileSystemShmemAbstractSelfTest extends Ignite
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     public void testOutOfResources() throws Exception {
-        if (1 == 1) return;
-
         final Collection<IpcEndpoint> eps = new LinkedList<>();
 
         try {

Reply via email to