http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java new file mode 100644 index 0000000..bb155b4 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsImpl; +import org.apache.ignite.internal.processors.igfs.IgfsMetaManager; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Callable; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH; +import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.awaitFileClose; +import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.clear; +import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.create; + +/** + * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC. + */ +public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** Primary file system URI. */ + protected static final String PRIMARY_URI = "igfs://igfs:grid@/"; + + /** Primary file system configuration path. */ + protected static final String PRIMARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback.xml"; + + /** Primary file system REST endpoint configuration map. */ + protected static final IgfsIpcEndpointConfiguration PRIMARY_REST_CFG; + + /** Secondary file system REST endpoint configuration map. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; + + /** Directory. */ + protected static final IgfsPath DIR = new IgfsPath("/dir"); + + /** Sub-directory. */ + protected static final IgfsPath SUBDIR = new IgfsPath(DIR, "subdir"); + + /** File. */ + protected static final IgfsPath FILE = new IgfsPath(SUBDIR, "file"); + + /** Default data chunk (128 bytes). */ + protected static byte[] chunk; + + /** Primary IGFS. */ + protected static IgfsImpl igfs; + + /** Secondary IGFS. */ + protected static IgfsImpl igfsSecondary; + + /** IGFS mode. */ + protected final IgfsMode mode; + + static { + PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + PRIMARY_REST_CFG.setPort(10500); + + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + + /** + * Constructor. + * + * @param mode IGFS mode. + */ + protected HadoopIgfsDualAbstractSelfTest(IgfsMode mode) { + this.mode = mode; + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + chunk = new byte[128]; + + for (int i = 0; i < chunk.length; i++) + chunk[i] = (byte)i; + + Ignite igniteSecondary = startGridWithIgfs("grid-secondary", "igfs-secondary", PRIMARY, null, SECONDARY_REST_CFG); + + IgfsSecondaryFileSystem hadoopFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + Ignite ignite = startGridWithIgfs("grid", "igfs", mode, hadoopFs, PRIMARY_REST_CFG); + + igfsSecondary = (IgfsImpl) igniteSecondary.fileSystem("igfs-secondary"); + igfs = (IgfsImpl) ignite.fileSystem("igfs"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + clear(igfs); + clear(igfsSecondary); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + G.stopAll(true); + } + + /** + * Convenient method to group paths. + * + * @param paths Paths to group. + * @return Paths as array. + */ + protected IgfsPath[] paths(IgfsPath... paths) { + return paths; + } + + /** + * Check how prefetch override works. + * + * @throws Exception IF failed. + */ + public void testOpenPrefetchOverride() throws Exception { + create(igfsSecondary, paths(DIR, SUBDIR), paths(FILE)); + + // Write enough data to the secondary file system. + final int blockSize = IGFS_BLOCK_SIZE; + + IgfsOutputStream out = igfsSecondary.append(FILE, false); + + int totalWritten = 0; + + while (totalWritten < blockSize * 2 + chunk.length) { + out.write(chunk); + + totalWritten += chunk.length; + } + + out.close(); + + awaitFileClose(igfsSecondary, FILE); + + // Instantiate file system with overridden "seq reads before prefetch" property. + Configuration cfg = new Configuration(); + + cfg.addResource(U.resolveIgniteUrl(PRIMARY_CFG)); + + int seqReads = SEQ_READS_BEFORE_PREFETCH + 1; + + cfg.setInt(String.format(PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, "igfs:grid@"), seqReads); + + FileSystem fs = FileSystem.get(new URI(PRIMARY_URI), cfg); + + // Read the first two blocks. + Path fsHome = new Path(PRIMARY_URI); + Path dir = new Path(fsHome, DIR.name()); + Path subdir = new Path(dir, SUBDIR.name()); + Path file = new Path(subdir, FILE.name()); + + FSDataInputStream fsIn = fs.open(file); + + final byte[] readBuf = new byte[blockSize * 2]; + + fsIn.readFully(0, readBuf, 0, readBuf.length); + + // Wait for a while for prefetch to finish (if any). + IgfsMetaManager meta = igfs.context().meta(); + + IgfsEntryInfo info = meta.info(meta.fileId(FILE)); + + IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2); + + IgniteCache<IgfsBlockKey, byte[]> dataCache = igfs.context().kernalContext().cache().jcache( + igfs.configuration().getDataCacheName()); + + for (int i = 0; i < 10; i++) { + if (dataCache.containsKey(key)) + break; + else + U.sleep(100); + } + + fsIn.close(); + + // Remove the file from the secondary file system. + igfsSecondary.delete(FILE, false); + + // Try reading the third block. Should fail. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgfsInputStream in0 = igfs.open(FILE); + + in0.seek(blockSize * 2); + + try { + in0.read(readBuf); + } + finally { + U.closeQuiet(in0); + } + + return null; + } + }, IOException.class, + "Failed to read data due to secondary file system exception: /dir/subdir/file"); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java new file mode 100644 index 0000000..6c6e709 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * Tests for DUAL_ASYNC mode. + */ +public class HadoopIgfsDualAsyncSelfTest extends HadoopIgfsDualAbstractSelfTest { + /** + * Constructor. + */ + public HadoopIgfsDualAsyncSelfTest() { + super(DUAL_ASYNC); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java new file mode 100644 index 0000000..96a63d5 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * Tests for DUAL_SYNC mode. + */ +public class HadoopIgfsDualSyncSelfTest extends HadoopIgfsDualAbstractSelfTest { + /** + * Constructor. + */ + public HadoopIgfsDualSyncSelfTest() { + super(DUAL_SYNC); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java new file mode 100644 index 0000000..f7af6f0 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemTestAdapter; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance. + */ +public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFileSystemTestAdapter { + /** File system factory. */ + private final HadoopFileSystemFactory factory; + + /** + * Constructor. + * @param factory File system factory. + */ + public HadoopIgfsSecondaryFileSystemTestAdapter(HadoopFileSystemFactory factory) { + assert factory != null; + + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public String name() throws IOException { + return get().getUri().toString(); + } + + /** {@inheritDoc} */ + @Override public boolean exists(String path) throws IOException { + return get().exists(new Path(path)); + } + + /** {@inheritDoc} */ + @Override public boolean delete(String path, boolean recursive) throws IOException { + return get().delete(new Path(path), recursive); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(String path) throws IOException { + boolean ok = get().mkdirs(new Path(path)); + if (!ok) + throw new IOException("Failed to mkdirs: " + path); + } + + /** {@inheritDoc} */ + @Override public void format() throws IOException { + HadoopIgfsUtils.clear(get()); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties(String path) throws IOException { + Path p = new Path(path); + + FileStatus status = get().getFileStatus(p); + + Map<String,String> m = new HashMap<>(3); + + m.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); + m.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); + m.put(IgfsUtils.PROP_PERMISSION, permission(status)); + + return m; + } + + /** {@inheritDoc} */ + @Override public String permissions(String path) throws IOException { + return permission(get().getFileStatus(new Path(path))); + } + + /** + * Get permission for file status. + * + * @param status Status. + * @return Permission. + */ + private String permission(FileStatus status) { + FsPermission perm = status.getPermission(); + + return "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() + perm.getOtherAction().ordinal(); + } + + /** {@inheritDoc} */ + @Override public InputStream openInputStream(String path) throws IOException { + return get().open(new Path(path)); + } + + /** {@inheritDoc} */ + @Override public OutputStream openOutputStream(String path, boolean append) throws IOException { + Path p = new Path(path); + + if (append) + return get().append(p); + else + return get().create(p, true/*overwrite*/); + } + + /** {@inheritDoc} */ + @Override public T2<Long, Long> times(String path) throws IOException { + FileStatus status = get().getFileStatus(new Path(path)); + + return new T2<>(status.getAccessTime(), status.getModificationTime()); + } + + /** {@inheritDoc} */ + @Override public IgfsEx igfs() { + return null; + } + + /** + * Create file system. + * + * @return File system. + * @throws IOException If failed. + */ + protected FileSystem get() throws IOException { + return factory.get(FileSystemConfiguration.DFLT_USER_NAME); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java new file mode 100644 index 0000000..d9b5d66 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Callable; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; +import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; +import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.igfs.IgfsMode.PROXY; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT; + +/** + * Tests secondary file system configuration. + */ +public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest { + /** IGFS scheme */ + static final String IGFS_SCHEME = "igfs"; + + /** Primary file system authority. */ + private static final String PRIMARY_AUTHORITY = "igfs:grid0@"; + + /** Autogenerated secondary file system configuration path. */ + private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml"; + + /** Secondary file system authority. */ + private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500"; + + /** Autogenerated secondary file system configuration path. */ + static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + + /** Secondary endpoint configuration. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; + + /** Group size. */ + public static final int GRP_SIZE = 128; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Primary file system URI. */ + protected URI primaryFsUri; + + /** Primary file system. */ + private FileSystem primaryFs; + + /** Full path of primary Fs configuration */ + private String primaryConfFullPath; + + /** Input primary Fs uri */ + private String primaryFsUriStr; + + /** Input URI scheme for configuration */ + private String primaryCfgScheme; + + /** Input URI authority for configuration */ + private String primaryCfgAuthority; + + /** if to pass configuration */ + private boolean passPrimaryConfiguration; + + /** Full path of s Fs configuration */ + private String secondaryConfFullPath; + + /** /Input URI scheme for configuration */ + private String secondaryFsUriStr; + + /** Input URI scheme for configuration */ + private String secondaryCfgScheme; + + /** Input URI authority for configuration */ + private String secondaryCfgAuthority; + + /** if to pass configuration */ + private boolean passSecondaryConfiguration; + + /** Default IGFS mode. */ + protected final IgfsMode mode; + + /** Skip embedded mode flag. */ + private final boolean skipEmbed; + + /** Skip local shmem flag. */ + private final boolean skipLocShmem; + + static { + SECONDARY_ENDPOINT_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_ENDPOINT_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_ENDPOINT_CFG.setPort(11500); + } + + /** + * Constructor. + * + * @param mode Default IGFS mode. + * @param skipEmbed Whether to skip embedded mode. + * @param skipLocShmem Whether to skip local shmem mode. + */ + protected HadoopSecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed, boolean skipLocShmem) { + this.mode = mode; + this.skipEmbed = skipEmbed; + this.skipLocShmem = skipLocShmem; + } + + /** + * Default constructor. + */ + public HadoopSecondaryFileSystemConfigurationTest() { + this(PROXY, true, false); + } + + /** + * Executes before each test. + * @throws Exception + */ + private void before() throws Exception { + initSecondary(); + + if (passPrimaryConfiguration) { + Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority, skipEmbed, skipLocShmem); + + primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH); + } + else + primaryConfFullPath = null; + + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); + + fac.setConfigPaths(primaryConfFullPath); + fac.setUri(primaryFsUriStr); + + fac.start(); + + primaryFs = fac.get(null); //provider.createFileSystem(null); + + primaryFsUri = primaryFs.getUri(); + } + + /** + * Executes after each test. + * @throws Exception + */ + private void after() throws Exception { + if (primaryFs != null) { + try { + primaryFs.delete(new Path("/"), true); + } + catch (Exception ignore) { + // No-op. + } + + U.closeQuiet(primaryFs); + } + + G.stopAll(true); + + delete(primaryConfFullPath); + delete(secondaryConfFullPath); + } + + /** + * Utility method to delete file. + * + * @param file the file path to delete. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static void delete(String file) { + if (file != null) { + new File(file).delete(); + + assertFalse(new File(file).exists()); + } + } + + /** + * Initialize underlying secondary filesystem. + * + * @throws Exception + */ + private void initSecondary() throws Exception { + if (passSecondaryConfiguration) { + Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority, true, true); + + secondaryConf.setInt("fs.igfs.block.size", 1024); + + secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH); + } + else + secondaryConfFullPath = null; + + startNodes(); + } + + /** + * Starts the nodes for this test. + * + * @throws Exception If failed. + */ + private void startNodes() throws Exception { + if (mode != PRIMARY) + startSecondary(); + + startGrids(4); + } + + /** + * Starts secondary IGFS + */ + private void startSecondary() { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs_secondary"); + igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG); + igfsCfg.setBlockSize(512 * 1024); + igfsCfg.setPrefetchBlocks(1); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid_secondary"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setCommunicationSpi(communicationSpi()); + + G.start(cfg); + } + + /** + * Get primary IPC endpoint configuration. + * + * @param gridName Grid name. + * @return IPC primary endpoint configuration. + */ + protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { + IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration(); + + cfg.setType(IgfsIpcEndpointType.TCP); + cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override public String getTestGridName() { + return "grid"; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(cacheConfiguration()); + cfg.setFileSystemConfiguration(fsConfiguration(gridName)); + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + cfg.setCommunicationSpi(communicationSpi()); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + protected CacheConfiguration[] cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + return new CacheConfiguration[] {metaCacheCfg, cacheCfg}; + } + + /** + * Gets IGFS configuration. + * + * @param gridName Grid name. + * @return IGFS configuration. + */ + protected FileSystemConfiguration fsConfiguration(String gridName) throws IgniteCheckedException { + FileSystemConfiguration cfg = new FileSystemConfiguration(); + + cfg.setDataCacheName("partitioned"); + cfg.setMetaCacheName("replicated"); + cfg.setName("igfs"); + cfg.setPrefetchBlocks(1); + cfg.setDefaultMode(mode); + + if (mode != PRIMARY) + cfg.setSecondaryFileSystem( + new IgniteHadoopIgfsSecondaryFileSystem(secondaryFsUriStr, secondaryConfFullPath)); + + cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); + + cfg.setManagementPort(-1); + cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. + + return cfg; + } + + /** @return Communication SPI. */ + private CommunicationSpi communicationSpi() { + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + return commSpi; + } + + /** + * Case #SecondaryFileSystemProvider(null, path) + * + * @throws Exception On failure. + */ + public void testFsConfigurationOnly() throws Exception { + primaryCfgScheme = IGFS_SCHEME; + primaryCfgAuthority = PRIMARY_AUTHORITY; + passPrimaryConfiguration = true; + primaryFsUriStr = null; + + // wrong secondary URI in the configuration: + secondaryCfgScheme = IGFS_SCHEME; + secondaryCfgAuthority = SECONDARY_AUTHORITY; + passSecondaryConfiguration = true; + secondaryFsUriStr = null; + + check(); + } + + /** + * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides + * the Fs uri set in the configuration. + * + * @throws Exception On failure. + */ + public void testFsUriOverridesUriInConfiguration() throws Exception { + // wrong primary URI in the configuration: + primaryCfgScheme = "foo"; + primaryCfgAuthority = "moo:zoo@bee"; + passPrimaryConfiguration = true; + primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY); + + // wrong secondary URI in the configuration: + secondaryCfgScheme = "foo"; + secondaryCfgAuthority = "moo:zoo@bee"; + passSecondaryConfiguration = true; + secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); + + check(); + } + + /** + * Perform actual check. + * + * @throws Exception If failed. + */ + @SuppressWarnings("deprecation") + private void check() throws Exception { + before(); + + try { + Path fsHome = new Path(primaryFsUri); + Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3"); + Path file = new Path(dir, "someFile"); + + assertPathDoesNotExist(primaryFs, file); + + FsPermission fsPerm = new FsPermission((short)644); + + FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L, null); + + // Try to write something in file. + os.write("abc".getBytes()); + + os.close(); + + // Check file status. + FileStatus fileStatus = primaryFs.getFileStatus(file); + + assertFalse(fileStatus.isDir()); + assertEquals(file, fileStatus.getPath()); + assertEquals(fsPerm, fileStatus.getPermission()); + } + finally { + after(); + } + } + + /** + * Create configuration for test. + * + * @param skipEmbed Whether to skip embedded mode. + * @param skipLocShmem Whether to skip local shmem mode. + * @return Configuration. + */ + static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) { + final Configuration cfg = new Configuration(); + + if (scheme != null && authority != null) + cfg.set("fs.defaultFS", scheme + "://" + authority + "/"); + + setImplClasses(cfg); + + if (authority != null) { + if (skipEmbed) + cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true); + + if (skipLocShmem) + cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true); + } + + return cfg; + } + + /** + * Sets Hadoop Fs implementation classes. + * + * @param cfg the configuration to set parameters into. + */ + static void setImplClasses(Configuration cfg) { + cfg.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + cfg.set("fs.AbstractFileSystem.igfs.impl", + org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem.class.getName()); + } + + /** + * Check path does not exist in a given FileSystem. + * + * @param fs FileSystem to check. + * @param path Path to check. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void assertPathDoesNotExist(final FileSystem fs, final Path path) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fs.getFileStatus(path); + } + }, FileNotFoundException.class, null); + } + + /** + * Writes down the configuration to local disk and returns its path. + * + * @param cfg the configuration to write. + * @param pathFromIgniteHome path relatively to Ignite home. + * @return Full path of the written configuration. + */ + static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException { + if (!pathFromIgniteHome.startsWith("/")) + pathFromIgniteHome = "/" + pathFromIgniteHome; + + final String path = U.getIgniteHome() + pathFromIgniteHome; + + delete(path); + + File file = new File(path); + + try (FileOutputStream fos = new FileOutputStream(file)) { + cfg.writeXml(fos); + } + + assertTrue(file.exists()); + return path; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } + + /** + * Makes URI. + * + * @param scheme the scheme + * @param authority the authority + * @return URI String + */ + static String mkUri(String scheme, String authority) { + return scheme + "://" + authority + "/"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java new file mode 100644 index 0000000..a9d7bad --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import junit.framework.TestSuite; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint; +import org.apache.ignite.internal.util.typedef.G; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * Test suite for IGFS event tests. + */ +@SuppressWarnings("PublicInnerClass") +public class IgfsEventsTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + ClassLoader ldr = TestSuite.class.getClassLoader(); + + TestSuite suite = new TestSuite("Ignite FS Events Test Suite"); + + suite.addTest(new TestSuite(ldr.loadClass(ShmemPrimary.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName()))); + + return suite; + } + + /** + * @return Test suite with only tests that are supported on all platforms. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suiteNoarchOnly() throws Exception { + ClassLoader ldr = TestSuite.class.getClassLoader(); + + TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only"); + + suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName()))); + + return suite; + } + + /** + * Shared memory IPC in PRIVATE mode. + */ + public static class ShmemPrimary extends IgfsEventsAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + return igfsCfg; + } + } + + /** + * Loopback socket IPS in PRIVATE mode. + */ + public static class LoopbackPrimary extends IgfsEventsAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + return igfsCfg; + } + } + + /** + * Base class for all IGFS tests with primary and secondary file system. + */ + public abstract static class PrimarySecondaryTest extends IgfsEventsAbstractSelfTest { + /** Secondary file system. */ + private static IgniteFileSystem igfsSec; + + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( + "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/", + "modules/core/src/test/config/hadoop/core-site-secondary.xml")); + + return igfsCfg; + } + + /** + * @return IGFS configuration for secondary file system. + */ + protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setName("igfs-secondary"); + igfsCfg.setDefaultMode(PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(11500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + return igfsCfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + igfsSec = startSecondary(); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + G.stopAll(true); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Clean up secondary file system. + igfsSec.format(); + } + + /** + * Start a grid with the secondary file system. + * + * @return Secondary file system handle. + * @throws Exception If failed. + */ + @Nullable private IgniteFileSystem startSecondary() throws Exception { + IgniteConfiguration cfg = getConfiguration("grid-secondary", getSecondaryIgfsConfiguration()); + + cfg.setLocalHost("127.0.0.1"); + cfg.setPeerClassLoadingEnabled(false); + + Ignite secG = G.start(cfg); + + return secG.fileSystem("igfs-secondary"); + } + } + + /** + * Shared memory IPC in DUAL_SYNC mode. + */ + public static class ShmemDualSync extends PrimarySecondaryTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(DUAL_SYNC); + + return igfsCfg; + } + } + + /** + * Shared memory IPC in DUAL_SYNC mode. + */ + public static class ShmemDualAsync extends PrimarySecondaryTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(DUAL_ASYNC); + + return igfsCfg; + } + } + + /** + * Loopback socket IPC with secondary file system. + */ + public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + + igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( + "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/", + "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); + + return igfsCfg; + } + + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getSecondaryIgfsConfiguration(); + + igfsCfg.setName("igfs-secondary"); + igfsCfg.setDefaultMode(PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(11500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + return igfsCfg; + } + } + + /** + * Loopback IPC in DUAL_SYNC mode. + */ + public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(DUAL_SYNC); + + return igfsCfg; + } + } + + /** + * Loopback socket IPC in DUAL_ASYNC mode. + */ + public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest { + /** {@inheritDoc} */ + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + + igfsCfg.setDefaultMode(DUAL_ASYNC); + + return igfsCfg; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java new file mode 100644 index 0000000..8e79356 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; +import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; +import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; + +/** + * Test hadoop file system implementation. + */ +public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { + /** Path to the default hadoop configuration. */ + public static final String HADOOP_FS_CFG = "examples/config/filesystem/core-site.xml"; + + /** Group size. */ + public static final int GRP_SIZE = 128; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Node count. */ + private int cnt; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(nodeCount()); + + grid(0).createNearCache("data", new NearCacheConfiguration()); + + grid(0).createNearCache("meta", new NearCacheConfiguration()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + G.stopAll(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true)); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("data"); + igfsCfg.setMetaCacheName("meta"); + igfsCfg.setName("igfs"); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. + + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setCacheConfiguration(cacheConfiguration(gridName, "data"), cacheConfiguration(gridName, "meta")); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + if (cnt == 0) + cfg.setClientMode(true); + + cnt++; + + return cfg; + } + + /** @return Node count for test. */ + protected int nodeCount() { + return 4; + } + + /** + * Gets cache configuration. + * + * @param gridName Grid name. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration(String gridName, String cacheName) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(cacheName); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + return cacheCfg; + } + + /** + * Gets config of concrete File System. + * + * @return Config of concrete File System. + */ + protected Configuration getFileSystemConfig() { + Configuration cfg = new Configuration(); + + cfg.addResource(U.resolveIgniteUrl(HADOOP_FS_CFG)); + + return cfg; + } + + /** + * Gets File System name. + * + * @param grid Grid index. + * @return File System name. + */ + protected URI getFileSystemURI(int grid) { + try { + return new URI("igfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid)); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + /** @throws Exception If failed. */ + public void testContentsConsistency() throws Exception { + try (FileSystem fs = FileSystem.get(getFileSystemURI(0), getFileSystemConfig())) { + Collection<IgniteBiTuple<String, Long>> files = F.asList( + F.t("/dir1/dir2/file1", 1024L), + F.t("/dir1/dir2/file2", 8 * 1024L), + F.t("/dir1/file1", 1024 * 1024L), + F.t("/dir1/file2", 5 * 1024 * 1024L), + F.t("/file1", 64 * 1024L + 13), + F.t("/file2", 13L), + F.t("/file3", 123764L) + ); + + for (IgniteBiTuple<String, Long> file : files) { + + info("Writing file: " + file.get1()); + + try (OutputStream os = fs.create(new Path(file.get1()), (short)3)) { + byte[] data = new byte[file.get2().intValue()]; + + data[0] = 25; + data[data.length - 1] = 26; + + os.write(data); + } + + info("Finished writing file: " + file.get1()); + } + + for (int i = 1; i < nodeCount(); i++) { + + try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), getFileSystemConfig())) { + for (IgniteBiTuple<String, Long> file : files) { + Path path = new Path(file.get1()); + + FileStatus fileStatus = fs.getFileStatus(path); + + assertEquals(file.get2(), (Long)fileStatus.getLen()); + + byte[] read = new byte[file.get2().intValue()]; + + info("Reading file: " + path); + + try (FSDataInputStream in = fs.open(path)) { + in.readFully(read); + + assert read[0] == 25; + assert read[read.length - 1] == 26; + } + + info("Finished reading file: " + path); + } + } + } + } + } +} \ No newline at end of file