IGNITE-2206: intermediate saving commit.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89690022 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89690022 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89690022 Branch: refs/heads/ignite-2206 Commit: 89690022863a32fb00e76335c02c6a36bfae55e1 Parents: ba11b3a Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Tue Dec 22 22:51:07 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Tue Dec 22 22:51:07 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopPayloadAware.java | 12 ++ .../processors/hadoop/PayloadAware.java | 13 -- .../internal/processors/igfs/IgfsImpl.java | 12 +- .../internal/processors/igfs/IgfsPaths.java | 37 +--- .../hadoop/fs/HadoopFileSystemFactory.java | 29 --- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 19 +- .../fs/v1/DefaultHadoopFileSystemFactory.java | 185 ++++++++++++++++++ .../hadoop/fs/v1/HadoopFileSystemFactory.java | 21 +++ .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 27 ++- .../fs/v2/HadoopAbstractFileSystemFactory.java | 21 +++ .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 7 +- .../fs/DefaultHadoopFileSystemFactory.java | 187 ------------------- .../hadoop/fs/HadoopLazyConcurrentMap.java | 4 + .../IgniteHadoopFileSystemAbstractSelfTest.java | 12 +- ...teHadoopFileSystemShmemAbstractSelfTest.java | 2 - 15 files changed, 284 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java new file mode 100644 index 0000000..dcb163f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java @@ -0,0 +1,12 @@ +package org.apache.ignite.internal.processors.hadoop; + +/** + * + */ +public interface HadoopPayloadAware { + /** + * + * @return + */ + public Object getPayload(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java deleted file mode 100644 index dc4ff5e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/PayloadAware.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.ignite.internal.processors.hadoop; - -/** - * Created by ivan on 22.12.15. - */ -public interface PayloadAware <P> { - - /** - * - * @return - */ - public P getPayload(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index fb93ea1..7453e15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs; import java.io.IOException; import java.io.OutputStream; -import java.io.Serializable; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -73,7 +72,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.hadoop.PayloadAware; +import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -125,7 +124,7 @@ public final class IgfsImpl implements IgfsEx { static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL); /** Handshake message. */ - private final IgfsPaths<Serializable> secondaryPaths; + private final IgfsPaths secondaryPaths; /** Cache based structure (meta data) manager. */ private IgfsMetaManager meta; @@ -260,11 +259,10 @@ public final class IgfsImpl implements IgfsEx { modeRslvr = new IgfsModeResolver(dfltMode, modes); - Serializable secondaryFsPayload = null; + Object secondaryFsPayload = null; - if (secondaryFs instanceof PayloadAware) { - secondaryFsPayload = ((PayloadAware<Serializable>) secondaryFs).getPayload(); - } + if (secondaryFs instanceof HadoopPayloadAware) + secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload(); secondaryPaths = new IgfsPaths( //secondaryFs == null ? null : secondaryFs.properties(), http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java index d434d01..83451db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java @@ -25,12 +25,10 @@ import java.io.ObjectInput; import java.io.ObjectInputStream; import java.io.ObjectOutput; import java.io.ObjectOutputStream; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.PayloadAware; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -38,16 +36,12 @@ import org.jetbrains.annotations.Nullable; /** * Description of path modes. */ -public class IgfsPaths <P extends Serializable> implements Externalizable, PayloadAware<P> { +public class IgfsPaths implements Externalizable { /** */ private static final long serialVersionUID = 0L; -// /** Additional secondary file system properties. */ -// @Deprecated -// private Map<String, String> props; - /** */ - private P payload; + private Object payload; /** Default IGFS mode. */ private IgfsMode dfltMode; @@ -68,26 +62,14 @@ public class IgfsPaths <P extends Serializable> implements Externalizable, Paylo * @param dfltMode Default IGFS mode. * @param pathModes Path modes. */ - public IgfsPaths(//Map<String, String> props, - P payload, + public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes) { - //this.props = props; this.payload = payload; this.dfltMode = dfltMode; this.pathModes = pathModes; } -// /** -// * @return Secondary file system properties. -// * -// * @deprecated -// */ -// @Deprecated -// public Map<String, String> properties() { -// return props; -// } - /** * @return Default IGFS mode. */ @@ -102,15 +84,6 @@ public class IgfsPaths <P extends Serializable> implements Externalizable, Paylo return pathModes; } -// /** -// * Getter for factory. -// * -// * @return The factory. -// */ -// public HadoopFileSystemFactory<F> factory() { -// return factory; -// } - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { // U.writeStringMap(out, props); @@ -188,14 +161,14 @@ public class IgfsPaths <P extends Serializable> implements Externalizable, Paylo ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(factoryBytes)); try { - payload = (P) oi.readObject(); + payload = oi.readObject(); } finally { oi.close(); } } - @Override public P getPayload() { + public Object getPayload() { return payload; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java deleted file mode 100644 index 5337f12..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.ignite.hadoop.fs; - -import java.io.IOException; -import java.io.Serializable; - -/** - * This factory is {@link Serializable} because it should be transferable over the network. - * - * @param <T> The type - */ -public interface HadoopFileSystemFactory <T> extends Serializable { - /** - * Gets the file system, possibly creating it or taking a cached instance. - * All the other data needed for the file system creation are expected to be contained - * in this object instance. - * - * @param userName The user name - * @return The file system. - * @throws IOException On error. - */ - public T get(String userName) throws IOException; - -// /** -// * Getter for the file system URI. -// * -// * @return The file system URI. -// */ -// public URI uri(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 7ba136b..86ed7a0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -45,8 +46,8 @@ import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.hadoop.PayloadAware; -import org.apache.ignite.internal.processors.hadoop.fs.DefaultHadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; +import org.apache.ignite.hadoop.fs.v1.DefaultHadoopFileSystemFactory; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; @@ -69,7 +70,7 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; * see {@link IgfsUserContext#currentUser()}. */ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, - LifecycleAware, PayloadAware<HadoopFileSystemFactory<FileSystem>> { + LifecycleAware, HadoopPayloadAware { // /** Properties of file system, see {@link #properties()} // * */ // private final Map<String, String> props = new HashMap<>(); @@ -97,7 +98,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys private String dfltUserName = IgfsUtils.fixUserName(null); /** */ - private HadoopFileSystemFactory<FileSystem> fsFactory; + private HadoopFileSystemFactory fsFactory; private final AtomicBoolean started = new AtomicBoolean(); @@ -174,20 +175,18 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys fac.setUri(uri); if (cfgPath != null) - fac.setCfgPaths(Collections.singletonList(cfgPath)); + fac.setConfigPaths(Collections.singletonList(cfgPath)); setFsFactory(fac); setDfltUserName(userName); - - start(); } /** * * @param factory */ - public void setFsFactory(HadoopFileSystemFactory<FileSystem> factory) { + public void setFsFactory(HadoopFileSystemFactory factory) { A.ensure(factory != null, "Factory value must not be null."); this.fsFactory = factory; @@ -600,7 +599,6 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** * Should be invoked by client (from Spring?) after all the setters invoked. - * TODO: how this should be invoked? * * @throws IgniteCheckedException */ @@ -609,6 +607,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys A.ensure(fsFactory != null, "factory"); A.ensure(dfltUserName != null, "dfltUserName"); + // Avoid if (started.compareAndSet(false, true)) { if (fsFactory instanceof LifecycleAware) ((LifecycleAware) fsFactory).start(); @@ -638,7 +637,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys close(); } - @Override public HadoopFileSystemFactory<FileSystem> getPayload() { + @Override public HadoopFileSystemFactory getPayload() { return fsFactory; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java new file mode 100644 index 0000000..ba4bfdd --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java @@ -0,0 +1,185 @@ +package org.apache.ignite.hadoop.fs.v1; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; +import org.apache.ignite.internal.processors.igfs.IgfsPaths; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; + +import static org.apache.ignite.internal.util.lang.GridFunc.nullifyEmpty; + +/** + * The class is to be instantiated as a Spring beans, so it must have public zero-arg constructor. + * The class is serializable as it will be transferred over the network as a part of {@link IgfsPaths} object. + */ +public class DefaultHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware { + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private final transient HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) { + try { + assert !F.isEmpty(key); + + return createFileSystem(key); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + + /** Configuration of the secondary filesystem, never null. */ + protected transient Configuration cfg; + + /** */ + protected transient URI uri; + + /** */ + protected String uriStr; + + /** */ + protected List<String> cfgPathStr; + + int getCount = 0; + + /** + * + */ + public DefaultHadoopFileSystemFactory() { + // + + + + } + + @Override public FileSystem get(String userName) throws IOException { + A.ensure(cfg != null, "cfg"); + + if (getCount == 0) + assert fileSysLazyMap.size() == 0; + + getCount++; + + return fileSysLazyMap.getOrCreate(userName); + } + + /** + * Uri setter. + * @param uriStr + */ + public void setUri(String uriStr) { + this.uriStr = uriStr; + } + + /** + * Configuration(s) setter, to be invoked from Spring config. + * @param cfgPaths + */ + public void setConfigPaths(List<String> cfgPaths) { + this.cfgPathStr = (List)nullifyEmpty(cfgPaths); + } + + /** + * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. + * @throws IOException + */ + protected FileSystem createFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(nullifyEmpty(userName)); + + assert cfg != null; + + final FileSystem fileSys; + + try { + fileSys = FileSystem.get(uri, cfg, userName); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + + return fileSys; + } + + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, uriStr); + + U.writeCollection(out, cfgPathStr); + } + + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + uriStr = U.readString(in); + + cfgPathStr = new ArrayList(U.readCollection(in)); + } + + @Override public void start() throws IgniteException { + cfg = HadoopUtils.safeCreateConfiguration(); + + if (cfgPathStr != null) { + for (String confPath : cfgPathStr) { + confPath = nullifyEmpty(confPath); + + if (confPath != null) { + URL url = U.resolveIgniteUrl(confPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " + + + "(ensure that it exists locally and you have read access to it): " + confPath); + } + + cfg.addResource(url); + } + } + } + + // if secondary fs URI is not given explicitly, try to get it from the configuration: + if (uriStr == null) + uri = FileSystem.getDefaultUri(cfg); + else { + try { + uri = new URI(uriStr); + } + catch (URISyntaxException use) { + throw new IgniteException("Failed to resolve secondary file system URI: " + uriStr); + } + } + + assert uriStr != null; + + // Disable caching: + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); + + cfg.setBoolean(prop, true); + } + + @Override public void stop() throws IgniteException { + try { + fileSysLazyMap.close(); + } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java new file mode 100644 index 0000000..c1c7b9d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java @@ -0,0 +1,21 @@ +package org.apache.ignite.hadoop.fs.v1; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.hadoop.fs.FileSystem; + +/** + * This factory is {@link Serializable} because it should be transferable over the network. + */ +public interface HadoopFileSystemFactory extends Serializable { + /** + * Gets the file system, possibly creating it or taking a cached instance. + * All the other data needed for the file system creation are expected to be contained + * in this object instance. + * + * @param userName The user name + * @return The file system. + * @throws IOException On error. + */ + public FileSystem get(String userName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 932e326..355892e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -18,12 +18,9 @@ package org.apache.ignite.hadoop.fs.v1; import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; @@ -47,7 +44,6 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -71,6 +67,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; @@ -190,7 +187,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public URI getUri() { if (uri == null) - throw new IllegalStateException("URI is null (was IgniteHadoopFileSystem properly initialized?)."); + throw new IllegalStateException("URI is null (was IgniteHadoopFileSystem properly initialized?) [closed=" + + closeGuard.get() + ']'); return uri; } @@ -243,6 +241,8 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override public void initialize(URI name, Configuration cfg) throws IOException { enterBusy(); + assert !closeGuard.get(); + try { if (rmtClient != null) throw new IOException("File system is already initialized: " + rmtClient); @@ -295,7 +295,7 @@ public class IgniteHadoopFileSystem extends FileSystem { igfsGrpBlockSize = handshake.blockSize(); - final IgfsPaths<HadoopFileSystemFactory<FileSystem>> paths = handshake.secondaryPaths(); + final IgfsPaths paths = handshake.secondaryPaths(); // Initialize client logger. Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); @@ -336,16 +336,15 @@ public class IgniteHadoopFileSystem extends FileSystem { // byte[] secFsFacoryBytes = handshake.getSecondaryFileSystemFactoryBytes(); - HadoopFileSystemFactory<FileSystem> factory = paths.getPayload(); + HadoopFileSystemFactory factory = (HadoopFileSystemFactory)paths.getPayload(); A.ensure(factory != null, "Secondary file system factory should not be null."); - //secondaryUri = factory.uri(); + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).start(); try { - //SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - - secondaryFs = factory.get(user); //secProvider.createFileSystem(user); + secondaryFs = factory.get(user); secondaryUri = secondaryFs.getUri(); @@ -440,12 +439,12 @@ public class IgniteHadoopFileSystem extends FileSystem { if (clientLog.isLogEnabled()) clientLog.close(); - if (secondaryFs != null) - U.closeQuiet(secondaryFs); + U.closeQuiet(secondaryFs); + + System.out.println("closed " + uri); // Reset initialized resources. uri = null; - System.out.println("uri zeroed."); rmtClient = null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java new file mode 100644 index 0000000..cf81e57 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopAbstractFileSystemFactory.java @@ -0,0 +1,21 @@ +package org.apache.ignite.hadoop.fs.v2; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.hadoop.fs.AbstractFileSystem; + +/** + * This factory is {@link Serializable} because it should be transferable over the network. + */ +interface HadoopAbstractFileSystemFactory extends Serializable { + /** + * Gets the file system, possibly creating it or taking a cached instance. + * All the other data needed for the file system creation are expected to be contained + * in this object instance. + * + * @param userName The user name + * @return The file system. + * @throws IOException On error. + */ + public AbstractFileSystem get(String userName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index d3267c7..96f97dc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -52,7 +52,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsMode; @@ -300,7 +299,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea grpBlockSize = handshake.blockSize(); - IgfsPaths<HadoopFileSystemFactory<AbstractFileSystem>> paths = handshake.secondaryPaths(); + IgfsPaths paths = handshake.secondaryPaths(); Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); @@ -338,8 +337,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea // String secUri = props.get(SECONDARY_FS_URI); // String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - HadoopFileSystemFactory<AbstractFileSystem> factory - = (HadoopFileSystemFactory<AbstractFileSystem>)paths.getPayload(); + HadoopAbstractFileSystemFactory factory + = (HadoopAbstractFileSystemFactory)paths.getPayload(); A.ensure(secondaryUri != null, "File system factory uri should not be null."); http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java deleted file mode 100644 index bee0f25..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java +++ /dev/null @@ -1,187 +0,0 @@ -package org.apache.ignite.internal.processors.hadoop.fs; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Collection; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.igfs.IgfsPaths; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; - -import static org.apache.ignite.internal.util.lang.GridFunc.nullifyEmpty; - -/** - * The class is to be instantiated as a Spring beans, so it must have public zero-arg constructor. - * The class is serializable as it will be transferred over the network as a part of {@link IgfsPaths} object. - */ -public class DefaultHadoopFileSystemFactory implements HadoopFileSystemFactory<FileSystem>, Externalizable, LifecycleAware { - /** Configuration of the secondary filesystem, never null. */ - protected final Configuration cfg = HadoopUtils.safeCreateConfiguration(); - - /** */ - private URI uri; - - /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { - @Override public FileSystem createValue(String key) { - try { - assert !F.isEmpty(key); - - return createFileSystem(key); - } - catch (IOException ioe) { - throw new IgniteException(ioe); - } - } - } - ); - - public DefaultHadoopFileSystemFactory() { - // - } - - @Override public FileSystem get(String userName) throws IOException { - return fileSysLazyMap.getOrCreate(userName); - } - - public void setUri(URI uri) { - this.uri = uri; - } - - /** - * Convenience mathod, analog of {@link #setUri(URI)} with String type argument. - * @param uriStr - */ - public void setUri(String uriStr) { - try { - setUri(new URI(uriStr)); - } - catch (URISyntaxException use) { - throw new IgniteException(use); - } - } - - /** - * Configuration(s) setter, to be invoked from Spring config. - * @param cfgPaths - */ - public void setCfgPaths(Collection<String> cfgPaths) { - cfgPaths = nullifyEmpty(cfgPaths); - - if (cfgPaths == null) - return; - - for (String confPath: cfgPaths) { - confPath = nullifyEmpty(confPath); - - if (confPath != null) { - URL url = U.resolveIgniteUrl(confPath); - - if (url == null) { - // If secConfPath is given, it should be resolvable: - throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + confPath); - } - - cfg.addResource(url); - } - } - } - - protected void init() throws IOException { - String secUri = nullifyEmpty(uri == null ? null : uri.toString()); - - A.ensure(cfg != null, "config"); - - // if secondary fs URI is not given explicitly, try to get it from the configuration: - if (secUri == null) - uri = FileSystem.getDefaultUri(cfg); - else { - try { - uri = new URI(secUri); - } - catch (URISyntaxException use) { - throw new IOException("Failed to resolve secondary file system URI: " + secUri); - } - } - - // Disable caching: - String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); - - cfg.setBoolean(prop, true); - } - - /** - * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. - * @throws IOException - */ - protected FileSystem createFileSystem(String userName) throws IOException { - userName = IgfsUtils.fixUserName(nullifyEmpty(userName)); - - final FileSystem fileSys; - - try { - fileSys = FileSystem.get(uri, cfg, userName); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - - return fileSys; - } - - @Override public void writeExternal(ObjectOutput out) throws IOException { - cfg.write(out); - - U.writeString(out, uri.toString()); - } - - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cfg.clear(); - - cfg.readFields(in); - - String uriStr = U.readString(in); - - try { - uri = new URI(uriStr); - } - catch (URISyntaxException use) { - throw new IOException(use); - } - } - - @Override public void start() throws IgniteException { - try { - init(); - } - catch (IOException ice) { - throw new IgniteException(ice); - } - } - - @Override public void stop() throws IgniteException { - try { - fileSysLazyMap.close(); - } - catch (IgniteCheckedException ice) { - throw new IgniteException(ice); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index 89eaf73..58b5120 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -57,6 +57,10 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> { assert getClass().getClassLoader() == Ignite.class.getClassLoader(); } + public int size () { + return map.size(); + } + /** * Gets cached or creates a new value of V. * Never returns null. http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index 7e5ef39..1ce0492 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; import java.net.URI; -import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; import java.util.Arrays; @@ -63,7 +62,7 @@ import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.processors.hadoop.fs.DefaultHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.v1.DefaultHadoopFileSystemFactory; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsIpcIo; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutProc; @@ -385,16 +384,16 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA if (mode != PRIMARY) { DefaultHadoopFileSystemFactory fac = new DefaultHadoopFileSystemFactory(); + fac.setUri(SECONDARY_URI); - fac.setCfgPaths(Collections.singletonList(SECONDARY_CFG_PATH)); + fac.setConfigPaths(Collections.singletonList(SECONDARY_CFG_PATH)); IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem(); sec.setFsFactory(fac); sec.setDfltUserName(SECONDARY_FS_USER); - sec.start(); - + // NB: start() will be invoked upon IgfsImpl init. cfg.setSecondaryFileSystem(sec); } @@ -412,7 +411,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA @Override public Object call() throws Exception { return new IgniteHadoopFileSystem().getUri(); } - }, IllegalStateException.class, "URI is null (was IgniteHadoopFileSystem properly initialized?)."); + }, IllegalStateException.class, + "URI is null (was IgniteHadoopFileSystem properly initialized?) [closed=false]"); } /** @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/89690022/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java index 20c2bd2..d8cf74c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java @@ -60,8 +60,6 @@ public abstract class IgniteHadoopFileSystemShmemAbstractSelfTest extends Ignite */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void testOutOfResources() throws Exception { - if (1 == 1) return; - final Collection<IpcEndpoint> eps = new LinkedList<>(); try {