IGNITE-2206: cleanup by review results.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83e12492 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83e12492 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83e12492 Branch: refs/heads/ignite-2206 Commit: 83e12492e6c792828a4eb1eb2b90cbd66081a770 Parents: 5a4586e Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Wed Dec 23 16:46:35 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Wed Dec 23 16:46:35 2015 +0300 ---------------------------------------------------------------------- .../binary/BinaryMarshallerSelfTest.java | 82 +++++----- .../org/apache/ignite/hadoop/HadoopFsIssue.java | 71 --------- .../fs/CachingHadoopFileSystemFactory.java | 48 +++--- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 70 ++++----- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 90 ++++++----- .../hadoop/fs/v2/HadoopV2FileSystemFactory.java | 11 -- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 27 +--- .../KerberosSecondaryFileSystemProvider.java | 55 ------- .../hadoop/SecondaryFileSystemProvider.java | 151 ------------------- .../hadoop/fs/HadoopLazyConcurrentMap.java | 10 +- ...oopSecondaryFileSystemConfigurationTest.java | 15 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 2 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 2 +- parent/pom.xml | 2 +- 15 files changed, 177 insertions(+), 461 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java index 9f7beb8..ac9771f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java @@ -17,6 +17,40 @@ package org.apache.ignite.internal.binary; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetSocketAddress; +import java.sql.Timestamp; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import junit.framework.Assert; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryCollectionFactory; @@ -53,43 +87,9 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import sun.misc.Unsafe; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.net.InetSocketAddress; -import java.sql.Timestamp; -import java.util.AbstractQueue; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; - -import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.*; -import static org.junit.Assert.*; +import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotEquals; /** * Binary marshaller tests. @@ -2339,8 +2339,8 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { // Check direct field access. assertNull(objBin.field(fieldName)); - assertEquals(Integer.valueOf(1), objBin.field(fieldNameA)); - assertEquals(Integer.valueOf(2), objBin.field(fieldNameB)); + assertEquals(1, objBin.field(fieldNameA)); + assertEquals(2, objBin.field(fieldNameB)); // Check metadata. BinaryType type = objBin.type(); @@ -2363,8 +2363,8 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { assert fieldB.exists(objBin); assertNull(field.value(objBin)); - assertEquals(Integer.valueOf(1), fieldA.value(objBin)); - assertEquals(Integer.valueOf(2), fieldB.value(objBin)); + assertEquals(1, fieldA.value(objBin)); + assertEquals(2, fieldB.value(objBin)); // Check object deserialization. DuplicateFieldsB deserialized = objBin.deserialize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java deleted file mode 100644 index 82314f1..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; - -/** - * Comment. - */ -public class HadoopFsIssue { - /** - * - * @param args - */ - public static void main(String args[]) { - String uri = null; - String cfgPath = null; - String user = null; - - for (String arg : args) { - if (arg.startsWith("uri=")) - uri = arg.split("=")[1].trim(); - else if (arg.startsWith("cfg=")) - cfgPath = arg.split("=")[1].trim(); - else if (arg.startsWith("user=")) - user = arg.split("=")[1].trim(); - else - throw new IllegalArgumentException("Unknown argument:" + arg); - } - - System.out.println("Connecting to HDFS with the following settings [uri=" + uri + ", cfg=" + cfgPath + ", user=" + user + ']'); - - try { - SecondaryFileSystemProvider provider = new SecondaryFileSystemProvider(uri, cfgPath); - - FileSystem fs = provider.createFileSystem(user); - - RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/tmp"), true); - - System.out.println("Got the iterator"); - - while (iter.hasNext()) { - LocatedFileStatus status = iter.next(); - - System.out.println(status); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java index 1e97b30..52d4db5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java @@ -73,50 +73,57 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory, /** */ protected List<String> cfgPathStr; - int getCount = 0; - /** - * + * Public non-arg constructor. */ public CachingHadoopFileSystemFactory() { - // - - - + // noop } + /** {@inheritDoc} */ @Override public FileSystem create(String userName) throws IOException { A.ensure(cfg != null, "cfg"); - if (getCount == 0) - assert fileSysLazyMap.size() == 0; - - getCount++; - return fileSysLazyMap.getOrCreate(userName); } - // TODO: Add getter. - /** * Uri setter. - * @param uriStr + * + * @param uriStr The URI to set. */ public void setUri(String uriStr) { this.uriStr = uriStr; } - // TODO: Add getter. + /** + * Gets the URI. + * + * @return The URI. + */ + public URI getUri() { + return uri; + } /** * Configuration(s) setter, to be invoked from Spring config. - * @param cfgPaths + * + * @param cfgPaths The config paths collection to set. */ public void setConfigPaths(List<String> cfgPaths) { this.cfgPathStr = cfgPaths; } /** + * Gets the config paths collection. + * + * @return The config paths collection. + */ + public List<String> getConfigPaths() { + return cfgPathStr; + } + + /** * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. * @throws IOException */ @@ -159,7 +166,9 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory, if (cfgPathStr != null) { for (String confPath : cfgPathStr) { - if (confPath != null) { + if (confPath == null) + throw new IgniteException("Null config path encountered."); + else { URL url = U.resolveIgniteUrl(confPath); if (url == null) { @@ -170,9 +179,6 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory, cfg.addResource(url); } - else { - // TODO: Throw exception. - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index aa1952d..9942ec4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -68,15 +68,11 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware, HadoopPayloadAware { /** The default user name. It is used if no user context is set. */ - private String dfltUsrName = IgfsUtils.fixUserName(null); + private @Nullable String dfltUsrName; /** */ private HadoopFileSystemFactory fsFactory; - /** FileSystem instance created for the default user. Stored outside due to performance reasons. */ - // TODO: Remove. - private volatile FileSystem dfltFs; - /** * Default constructor for Spring. */ @@ -90,6 +86,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @param uri URI of file system. * @throws IgniteCheckedException In case of error. */ + @Deprecated public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { this(uri, null, null); } @@ -128,14 +125,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys fac.setConfigPaths(Collections.singletonList(cfgPath)); setFileSystemFactory(fac); - setUserName(userName); + setDefaultUserName(userName); } - // TODO: Add getter. - // TODO: Add docs. /** + * Sets secondary file system factory. * - * @param factory + * @param factory The factory to set. */ public void setFileSystemFactory(HadoopFileSystemFactory factory) { A.ensure(factory != null, "Factory value must not be null."); @@ -143,17 +139,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys this.fsFactory = factory; } - // TODO: Add getter. - // TODO: Add docs. - // TODO: Rename to "setDefaultUserName" + /** + * Gets the secondary file system factory. + * + * @return The secondary file system factory. + */ + public HadoopFileSystemFactory getFileSystemFactory() { + return fsFactory; + } + + /** + * Sets the default user name. + * + * @param usrName The user name to set. + */ + public void setDefaultUserName(String usrName) { + this.dfltUsrName = usrName; + } /** + * Gets the default user name. * - * @param usrName + * @return The default user name. */ - public void setUserName(String usrName) { - // TODO: Move fix to start routine. - this.dfltUsrName = IgfsUtils.fixUserName(usrName); + public String getDefaultUserName() { + return dfltUsrName; } /** @@ -486,13 +496,10 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys String user = IgfsUserContext.currentUser(); if (F.isEmpty(user)) - user = dfltUsrName; // default is never empty. + user = IgfsUtils.fixUserName(dfltUsrName); assert !F.isEmpty(user); - if (F.eq(user, dfltUsrName)) - return dfltFs; // optimization - try { return fsFactory.create(user); } @@ -513,27 +520,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void stop() throws IgniteException { - Exception e = null; - - try { - if (dfltFs != null) - dfltFs.close(); - } - catch (Exception e0) { - e = e0; - } - - try { - if (fsFactory instanceof LifecycleAware) - ((LifecycleAware)fsFactory).stop(); - } - catch (IgniteException ie) { - if (e == null) - e = ie; - } - - if (e != null) - throw new IgniteException(e); + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware)fsFactory).stop(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 1546995..545c905 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -164,9 +164,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** IGFS mode resolver. */ private IgfsModeResolver modeRslvr; - // TODO: Secondary file system must be changed to factory. - /** Secondary file system instance. */ - private FileSystem secondaryFs; + /** The secondary file system factory. */ + private HadoopFileSystemFactory factory; /** Management connection flag. */ private boolean mgmt; @@ -263,7 +262,6 @@ public class IgniteHadoopFileSystem extends FileSystem { "://[name]/[optional_path], actual=" + name + ']'); uri = name; - System.out.println("uri initialized: " + uri); uriAuthority = uri.getAuthority(); @@ -331,14 +329,7 @@ public class IgniteHadoopFileSystem extends FileSystem { } if (initSecondary) { -// Map<String, String> props = paths.properties(); -// -// String secUri = props.get(SECONDARY_FS_URI); -// String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - -// byte[] secFsFacoryBytes = handshake.getSecondaryFileSystemFactoryBytes(); - - HadoopFileSystemFactory factory = (HadoopFileSystemFactory)paths.getPayload(); + factory = (HadoopFileSystemFactory)paths.getPayload(); A.ensure(factory != null, "Secondary file system factory should not be null."); @@ -346,13 +337,11 @@ public class IgniteHadoopFileSystem extends FileSystem { ((LifecycleAware) factory).start(); try { - secondaryFs = factory.create(user); + FileSystem secFs = factory.create(user); - secondaryUri = secondaryFs.getUri(); + secondaryUri = secFs.getUri(); A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); - - //assert secondaryUri.equals(uri2); } catch (IOException e) { if (!mgmt) @@ -371,23 +360,6 @@ public class IgniteHadoopFileSystem extends FileSystem { } } -// /** -// * -// * @param in -// * @throws IOException -// * @throws ClassNotFoundException -// */ -// static HadoopFileSystemFactory readFactory(byte[] factoryBytes) throws IOException, ClassNotFoundException { -// ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(factoryBytes)); -// -// try { -// return (HadoopFileSystemFactory<F>) oi.readObject(); -// } -// finally { -// oi.close(); -// } -// } - /** {@inheritDoc} */ @Override protected void checkPath(Path path) { URI uri = path.toUri(); @@ -441,9 +413,8 @@ public class IgniteHadoopFileSystem extends FileSystem { if (clientLog.isLogEnabled()) clientLog.close(); - U.closeQuiet(secondaryFs); - - System.out.println("closed " + uri); + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).stop(); // Reset initialized resources. uri = null; @@ -458,6 +429,8 @@ public class IgniteHadoopFileSystem extends FileSystem { A.notNull(p, "p"); if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -486,6 +459,8 @@ public class IgniteHadoopFileSystem extends FileSystem { A.notNull(p, "p"); if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -515,6 +490,8 @@ public class IgniteHadoopFileSystem extends FileSystem { try { if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -544,6 +521,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -616,6 +595,8 @@ public class IgniteHadoopFileSystem extends FileSystem { path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -697,6 +678,8 @@ public class IgniteHadoopFileSystem extends FileSystem { ", path=" + path + ", bufSize=" + bufSize + ']'); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -760,6 +743,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(srcPath); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -820,6 +805,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -865,6 +852,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -929,6 +918,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public void setWorkingDirectory(Path newPath) { + final FileSystem secondaryFs = secondaryFs(); + if (newPath == null) { Path homeDir = getHomeDirectory(); @@ -969,6 +960,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -1010,6 +1003,8 @@ public class IgniteHadoopFileSystem extends FileSystem { try { if (mode(f) == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -1040,6 +1035,8 @@ public class IgniteHadoopFileSystem extends FileSystem { try { if (mode(f) == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -1071,6 +1068,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsPath path = convert(status.getPath()); if (mode(status.getPath()) == PROXY) { + final FileSystem secondaryFs = secondaryFs(); + if (secondaryFs == null) { assert mgmt; @@ -1149,7 +1148,7 @@ public class IgniteHadoopFileSystem extends FileSystem { * @return Secondary file system path. */ private Path toSecondary(Path path) { - assert secondaryFs != null; + assert factory != null; assert secondaryUri != null; return convertPath(path, secondaryUri); @@ -1324,4 +1323,21 @@ public class IgniteHadoopFileSystem extends FileSystem { public String user() { return user; } + + /** + * Gets cached or creates a {@link FileSystem}. + * + * @return The secondary file system. + */ + private @Nullable FileSystem secondaryFs() { + if (factory == null) + return null; + + try { + return factory.create(user); + } + catch (IOException ioe) { + throw new IgniteException(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java deleted file mode 100644 index c2ab620..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -//package org.apache.ignite.hadoop.fs.v2; -// -//import org.apache.hadoop.fs.AbstractFileSystem; -//import org.apache.hadoop.fs.FileSystem; -// -///** -// * Created by ivan on 18.12.15. -// */ -//public interface HadoopV2FileSystemFactory { -// AbstractFileSystem create(String uri, String configPath, String userName); -//} http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 96f97dc..d665d4c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; @@ -168,6 +169,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** Secondary file system instance. */ private AbstractFileSystem secondaryFs; + /** Secondary file system factory. */ + private HadoopAbstractFileSystemFactory factory; + /** Whether custom sequential reads before prefetch value is provided. */ private boolean seqReadsBeforePrefetchOverride; @@ -332,32 +336,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea } if (initSecondary) { -// Map<String, String> props = paths.properties(); -// -// String secUri = props.get(SECONDARY_FS_URI); -// String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - - HadoopAbstractFileSystemFactory factory - = (HadoopAbstractFileSystemFactory)paths.getPayload(); + factory = (HadoopAbstractFileSystemFactory)paths.getPayload(); A.ensure(secondaryUri != null, "File system factory uri should not be null."); - //secondaryUri = factory.uri(); - try { - //SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - secondaryFs = factory.get(user); secondaryUri = secondaryFs.getUri(); - -// assert secondaryUri != null; -// -// URI uri2 = ((DefaultHadoopFileSystemFactory)factory).uri(); -// assert secondaryUri.equals(uri2); - - //secondaryFs = secProvider.createAbstractFileSystem(user); - //secondaryUri = secProvider.uri(); } catch (IOException e) { throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); @@ -380,6 +366,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.close(); + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).stop(); + // Reset initialized resources. rmtClient = null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java deleted file mode 100644 index 503ac46..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.jetbrains.annotations.Nullable; - -/** - * See https://issues.apache.org/jira/browse/IGNITE-2195 . - */ -public class KerberosSecondaryFileSystemProvider extends SecondaryFileSystemProvider { - /** - * Constructor. - **/ - public KerberosSecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException { - super(secUri, secConfPath); - } - - /** {@inheritDoc} */ - @Override public FileSystem createFileSystem(String userName) throws IOException { - UserGroupInformation.setConfiguration(cfg); - - UserGroupInformation ugi = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getCurrentUser()); - - try { - return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { - @Override public FileSystem run() throws Exception { - return FileSystem.get(uri, cfg); - } - }); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java deleted file mode 100644 index 1e7ac7f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.IgniteUtils; -import static org.apache.ignite.internal.util.typedef.F.*; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Encapsulates logic of secondary filesystem creation. - */ -public class SecondaryFileSystemProvider { - /** Configuration of the secondary filesystem, never null. */ - protected final Configuration cfg = HadoopUtils.safeCreateConfiguration(); - - /** The secondary filesystem URI, never null. */ - protected final URI uri; - - /** Configuration file path. */ - @Nullable protected final String confPath; - - /** - * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be - * specified either explicitly or in the configuration provided. - * - * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS" - * property in the provided configuration. - * @param secConfPath the secondary Fs path (file path on the local file system, optional). - * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. - * @throws IOException - */ - public SecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException { - confPath = secConfPath; - - if (confPath != null) { - URL url = U.resolveIgniteUrl(confPath); - - if (url == null) { - // If secConfPath is given, it should be resolvable: - throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + confPath); - } - - cfg.addResource(url); - } - - // if secondary fs URI is not given explicitly, try to get it from the configuration: - if (secUri == null) - uri = FileSystem.getDefaultUri(cfg); - else { - try { - uri = new URI(secUri); - } - catch (URISyntaxException use) { - throw new IOException("Failed to resolve secondary file system URI: " + secUri); - } - } - - // Disable caching: - String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); - - cfg.setBoolean(prop, true); - } - - /** - * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. - * @throws IOException - */ - public FileSystem createFileSystem(String userName) throws IOException { - userName = IgfsUtils.fixUserName(userName); - - final FileSystem fileSys; - - try { - fileSys = FileSystem.get(uri, cfg, userName); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - - return fileSys; - } - - /** - * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. - * @throws IOException in case of error. - */ - public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { - userName = IgfsUtils.fixUserName(userName); - - String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); - - UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); - - try { - return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() { - @Override public AbstractFileSystem run() throws IOException { - return AbstractFileSystem.get(uri, cfg); - } - }); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", ie); - } - } - - /** - * @return the secondary fs URI, never null. - */ - public URI uri() { - return uri; - } - - /** - * @return The configuration path, if any. - */ - @Nullable public String configurationPath() { - return confPath; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index 58b5120..2b20639 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -57,10 +57,6 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> { assert getClass().getClassLoader() == Ignite.class.getClassLoader(); } - public int size () { - return map.size(); - } - /** * Gets cached or creates a new value of V. * Never returns null. @@ -77,7 +73,7 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> { try { if (closed) throw new IllegalStateException("Failed to create value for key [" + k - + "]: the map is already closed."); + + "]: the map is already closed. this = " + System.identityHashCode(this)); final ValueWrapper wNew = new ValueWrapper(k); @@ -116,6 +112,10 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> { if (closed) return; + // TODO: debug: + System.out.println("##### closed: " + System.identityHashCode(this)); + Thread.dumpStack(); + closed = true; Exception err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 4ddfb0d..98ab317 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -34,9 +35,9 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; import org.apache.ignite.internal.util.typedef.G; @@ -173,12 +174,16 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra else primaryConfFullPath = null; - SecondaryFileSystemProvider provider = - new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); - primaryFs = provider.createFileSystem(null); + fac.setConfigPaths(Collections.singletonList(primaryConfFullPath)); + fac.setUri(primaryFsUriStr); - primaryFsUri = provider.uri(); + fac.start(); + + primaryFs = fac.create(null); //provider.createFileSystem(null); + + primaryFsUri = primaryFs.getUri(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index 310c390..d473592 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -391,7 +391,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem(); sec.setFileSystemFactory(fac); - sec.setUserName(SECONDARY_FS_USER); + sec.setDefaultUserName(SECONDARY_FS_USER); // NB: start() will be invoked upon IgfsImpl init. cfg.setSecondaryFileSystem(sec); http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index eac6bb8..0216f4b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -183,7 +183,7 @@ public class IgniteHadoopTestSuite extends TestSuite { * @throws Exception If failed. */ public static void downloadHadoop() throws Exception { - String ver = IgniteSystemProperties.getString("hadoop.version", "2.6.0"); + String ver = IgniteSystemProperties.getString("hadoop.version", "2.4.1"); X.println("Will use Hadoop version: " + ver); http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 1507543..3782596 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -475,7 +475,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac Cache.Entry<BinaryObject, BinaryObject> entry = F.first(qry.getAll()); assertNotNull(entry); - assertEquals(Long.valueOf(100500L), entry.getKey().field("id")); + assertEquals(100500L, entry.getKey().field("id")); assertEquals(val1, entry.getValue().deserialize()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 0481088..f665d40 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -69,7 +69,7 @@ <guava14.version>14.0.1</guava14.version> <guava16.version>16.0.1</guava16.version> <h2.version>1.3.175</h2.version> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.4.1</hadoop.version> <httpclient.version>4.5.1</httpclient.version> <httpcore.version>4.4.3</httpcore.version> <jackson.version>1.9.13</jackson.version>