http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java new file mode 100644 index 0000000..8ddb359 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfs; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutProc; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.processors.igfs.IgfsContext; +import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; +import org.apache.ignite.internal.processors.igfs.IgfsServer; +import org.apache.ignite.internal.processors.igfs.IgfsServerHandler; +import org.apache.ignite.internal.processors.igfs.IgfsServerManager; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT; + +/** + * Test interaction between a IGFS client and a IGFS server. + */ +public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest { + /** Logger. */ + private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystemClientSelfTest.class); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + G.stopAll(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs"); + igfsCfg.setBlockSize(512 * 1024); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(DFLT_IPC_PORT); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + cfg.setCacheConfiguration(cacheConfiguration()); + cfg.setFileSystemConfiguration(igfsCfg); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + protected CacheConfiguration[] cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setEvictionPolicy(null); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + metaCacheCfg.setEvictionPolicy(null); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + return new CacheConfiguration[] {metaCacheCfg, cacheCfg}; + } + + /** + * Test output stream deferred exception (GG-4440). + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testOutputStreamDeferredException() throws Exception { + final byte[] data = "test".getBytes(); + + try { + switchHandlerErrorFlag(true); + + HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null); + + client.handshake(null); + + IgfsPath path = new IgfsPath("/test1.file"); + + HadoopIgfsStreamDelegate delegate = client.create(path, true, false, 1, 1024, null); + + final HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(delegate, LOG, + IgfsLogger.disabledLogger(), 0); + + // This call should return fine as exception is thrown for the first time. + igfsOut.write(data); + + U.sleep(500); + + // This call should throw an IO exception. + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + igfsOut.write(data); + + return null; + } + }, IOException.class, "Failed to write data to server (test)."); + } + finally { + switchHandlerErrorFlag(false); + } + } + + /** + * Set IGFS REST handler error flag to the given state. + * + * @param flag Flag state. + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private void switchHandlerErrorFlag(boolean flag) throws Exception { + IgfsProcessorAdapter igfsProc = ((IgniteKernal)grid(0)).context().igfs(); + + Map<String, IgfsContext> igfsMap = getField(igfsProc, "igfsCache"); + + IgfsServerManager srvMgr = F.first(igfsMap.values()).server(); + + Collection<IgfsServer> srvrs = getField(srvMgr, "srvrs"); + + IgfsServerHandler igfsHnd = getField(F.first(srvrs), "hnd"); + + Field field = igfsHnd.getClass().getDeclaredField("errWrite"); + + field.setAccessible(true); + + field.set(null, flag); + } + + /** + * Get value of the field with the given name of the given object. + * + * @param obj Object. + * @param fieldName Field name. + * @return Value of the field. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private <T> T getField(Object obj, String fieldName) throws Exception { + Field field = obj.getClass().getDeclaredField(fieldName); + + field.setAccessible(true); + + return (T)field.get(obj); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java new file mode 100644 index 0000000..fdb0d77 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Callable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT; + +/** + * Tests for IGFS file system handshake. + */ +public class IgniteHadoopFileSystemHandshakeSelfTest extends IgfsCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid name. */ + private static final String GRID_NAME = "grid"; + + /** IGFS name. */ + private static final String IGFS_NAME = "igfs"; + + /** IGFS path. */ + private static final IgfsPath PATH = new IgfsPath("/path"); + + /** A host-port pair used for URI in embedded mode. */ + private static final String HOST_PORT_UNUSED = "somehost:65333"; + + /** Flag defines if to use TCP or embedded connection mode: */ + private boolean tcp = false; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * Tests for Grid and IGFS having normal names. + * + * @throws Exception If failed. + */ + public void testHandshake() throws Exception { + startUp(false, false); + + tcp = true; + + checkValid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkValid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkValid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(IGFS_NAME + "@"); + checkValid(IGFS_NAME + "@127.0.0.1"); + checkValid(IGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(":" + GRID_NAME + "@"); + checkValid(":" + GRID_NAME + "@127.0.0.1"); + checkValid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(""); + checkValid("127.0.0.1"); + checkValid("127.0.0.1:" + DFLT_IPC_PORT); + + tcp = false; // Embedded mode: + + checkValid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkValid(IGFS_NAME + ":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkValid(IGFS_NAME + "@"); // Embedded mode fails, but remote tcp succeeds. + checkInvalid(IGFS_NAME + "@" + HOST_PORT_UNUSED); + + checkValid(":" + GRID_NAME + "@"); // Embedded mode fails, but remote tcp succeeds. + checkInvalid(":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkValid("@"); // Embedded mode fails, but remote tcp succeeds. + checkInvalid("@" + HOST_PORT_UNUSED); + } + + /** + * Tests for Grid having {@code null} name and IGFS having normal name. + * + * @throws Exception If failed. + */ + public void testHandshakeDefaultGrid() throws Exception { + startUp(true, false); + + tcp = true; + + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(IGFS_NAME + "@"); + checkValid(IGFS_NAME + "@127.0.0.1"); + checkValid(IGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(""); + checkValid("127.0.0.1"); + checkValid("127.0.0.1:" + DFLT_IPC_PORT); + + tcp = false; // Embedded mode: + + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkValid(IGFS_NAME + "@"); + checkValid(IGFS_NAME + "@" + HOST_PORT_UNUSED); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkValid("@"); // Embedded mode fails, but remote tcp succeeds. + checkInvalid("@" + HOST_PORT_UNUSED); + } + + /** + * Tests for Grid having normal name and IGFS having {@code null} name. + * + * @throws Exception If failed. + */ + public void testHandshakeDefaultIgfs() throws Exception { + startUp(false/*grid name*/, true/*default igfs*/); + + tcp = true; + + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(IGFS_NAME + "@"); + checkInvalid(IGFS_NAME + "@127.0.0.1"); + checkInvalid(IGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(":" + GRID_NAME + "@"); + checkValid(":" + GRID_NAME + "@127.0.0.1"); + checkValid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(""); + checkValid("127.0.0.1"); + checkValid("127.0.0.1:" + DFLT_IPC_PORT); + + tcp = false; // Embedded mode: + + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkInvalid(IGFS_NAME + "@"); + checkInvalid(IGFS_NAME + "@" + HOST_PORT_UNUSED); + + checkValid(":" + GRID_NAME + "@"); + checkValid(":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkValid("@"); // NB: in embedded mode this fails, but remote TCP still succeeds. + checkInvalid("@" + HOST_PORT_UNUSED); + } + + /** + * Tests for Grid having {@code null} name and IGFS having {@code null} name. + * + * @throws Exception If failed. + */ + public void testHandshakeDefaultGridDefaultIgfs() throws Exception { + startUp(true, true); + + tcp = true; + + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(IGFS_NAME + "@"); + checkInvalid(IGFS_NAME + "@127.0.0.1"); + checkInvalid(IGFS_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1"); + checkInvalid(":" + GRID_NAME + "@127.0.0.1:" + DFLT_IPC_PORT); + + checkValid(""); + checkValid("127.0.0.1"); + checkValid("127.0.0.1:" + DFLT_IPC_PORT); + + tcp = false; // Embedded mode: + + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@"); + checkInvalid(IGFS_NAME + ":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkInvalid(IGFS_NAME + "@"); + checkInvalid(IGFS_NAME + "@" + HOST_PORT_UNUSED); + + checkInvalid(":" + GRID_NAME + "@"); + checkInvalid(":" + GRID_NAME + "@" + HOST_PORT_UNUSED); + + checkValid("@"); + checkValid("@" + HOST_PORT_UNUSED); + } + + /** + * Perform startup. + * + * @param dfltGridName Default Grid name. + * @param dfltIgfsName Default IGFS name. + * @throws Exception If failed. + */ + private void startUp(boolean dfltGridName, boolean dfltIgfsName) throws Exception { + Ignite ignite = G.start(gridConfiguration(dfltGridName, dfltIgfsName)); + + IgniteFileSystem igfs = ignite.fileSystem(dfltIgfsName ? null : IGFS_NAME); + + igfs.mkdirs(PATH); + } + + /** + * Create Grid configuration. + * + * @param dfltGridName Default Grid name. + * @param dfltIgfsName Default IGFS name. + * @return Grid configuration. + * @throws Exception If failed. + */ + private IgniteConfiguration gridConfiguration(boolean dfltGridName, boolean dfltIgfsName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(dfltGridName ? null : GRID_NAME); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("partitioned"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(metaCacheCfg, dataCacheCfg); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName(dfltIgfsName ? null : IGFS_NAME); + igfsCfg.setPrefetchBlocks(1); + igfsCfg.setDefaultMode(PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(DFLT_IPC_PORT); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + igfsCfg.setManagementPort(-1); + igfsCfg.setBlockSize(512 * 1024); + + cfg.setFileSystemConfiguration(igfsCfg); + + return cfg; + } + + /** + * Check valid file system endpoint. + * + * @param authority Authority. + * @throws Exception If failed. + */ + private void checkValid(String authority) throws Exception { + FileSystem fs = fileSystem(authority, tcp); + + assert fs.exists(new Path(PATH.toString())); + } + + /** + * Check invalid file system endpoint. + * + * @param authority Authority. + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void checkInvalid(final String authority) throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fileSystem(authority, tcp); + + return null; + } + }, IOException.class, null); + } + + /** + * Gets the file system using authority and tcp flag. + * + * @param authority Authority. + * @return File system. + * @throws Exception If failed. + */ + private static FileSystem fileSystem(String authority, boolean tcp) throws Exception { + return FileSystem.get(new URI("igfs://" + authority + "/"), configuration(authority, tcp)); + } + + /** + * Create configuration for test. + * + * @param authority Authority. + * @return Configuration. + */ + private static Configuration configuration(String authority, boolean tcp) { + Configuration cfg = new Configuration(); + + cfg.set("fs.defaultFS", "igfs://" + authority + "/"); + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", + IgniteHadoopFileSystem.class.getName()); + + cfg.setBoolean("fs.igfs.impl.disable.cache", true); + + if (tcp) + cfg.setBoolean(String.format(PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true); + else + cfg.setBoolean(String.format(PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority), true); + + cfg.setBoolean(String.format(PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true); + + return cfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java new file mode 100644 index 0000000..4d7a39e --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsIpcIo; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; +import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; +import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; + +/** + * IPC cache test. + */ +public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Path to test hadoop configuration. */ + private static final String HADOOP_FS_CFG = "modules/core/src/test/config/hadoop/core-site.xml"; + + /** Group size. */ + public static final int GRP_SIZE = 128; + + /** Started grid counter. */ + private static int cnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs"); + igfsCfg.setManagementPort(FileSystemConfiguration.DFLT_MGMT_PORT + cnt); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. + + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + cnt++; + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration[] cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + return new CacheConfiguration[] {metaCacheCfg, cacheCfg}; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + G.stopAll(true); + } + + /** + * Test how IPC cache map works. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testIpcCache() throws Exception { + Field cacheField = HadoopIgfsIpcIo.class.getDeclaredField("ipcCache"); + + cacheField.setAccessible(true); + + Field activeCntField = HadoopIgfsIpcIo.class.getDeclaredField("activeCnt"); + + activeCntField.setAccessible(true); + + Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null); + + cache.clear(); // avoid influence of previous tests in the same process. + + String name = "igfs:" + getTestGridName(0) + "@"; + + Configuration cfg = new Configuration(); + + cfg.addResource(U.resolveIgniteUrl(HADOOP_FS_CFG)); + cfg.setBoolean("fs.igfs.impl.disable.cache", true); + cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, name), true); + + // Ensure that existing IO is reused. + FileSystem fs1 = FileSystem.get(new URI("igfs://" + name + "/"), cfg); + + assertEquals(1, cache.size()); + + HadoopIgfsIpcIo io = null; + + System.out.println("CACHE: " + cache); + + for (String key : cache.keySet()) { + if (key.contains("10500")) { + io = cache.get(key); + + break; + } + } + + assert io != null; + + assertEquals(1, ((AtomicInteger)activeCntField.get(io)).get()); + + // Ensure that when IO is used by multiple file systems and one of them is closed, IO is not stopped. + FileSystem fs2 = FileSystem.get(new URI("igfs://" + name + "/abc"), cfg); + + assertEquals(1, cache.size()); + assertEquals(2, ((AtomicInteger)activeCntField.get(io)).get()); + + fs2.close(); + + assertEquals(1, cache.size()); + assertEquals(1, ((AtomicInteger)activeCntField.get(io)).get()); + + Field stopField = HadoopIgfsIpcIo.class.getDeclaredField("stopping"); + + stopField.setAccessible(true); + + assert !(Boolean)stopField.get(io); + + // Ensure that IO is stopped when nobody else is need it. + fs1.close(); + + assert cache.isEmpty(); + + assert (Boolean)stopField.get(io); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerSelfTest.java new file mode 100644 index 0000000..3013311 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerSelfTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.DELIM_FIELD; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.DELIM_FIELD_VAL; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.HDR; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_CLOSE_IN; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_CLOSE_OUT; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_DELETE; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_DIR_LIST; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_DIR_MAKE; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_MARK; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_OPEN_IN; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_OPEN_OUT; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_RANDOM_READ; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_RENAME; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_RESET; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_SEEK; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.TYPE_SKIP; + +/** + * Grid IGFS client logger test. + */ +public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest { + /** Path string. */ + private static final String PATH_STR = "/dir1/dir2/file;test"; + + /** Path string with escaped semicolons. */ + private static final String PATH_STR_ESCAPED = PATH_STR.replace(';', '~'); + + /** Path. */ + private static final IgfsPath PATH = new IgfsPath(PATH_STR); + + /** IGFS name. */ + private static final String IGFS_NAME = "igfs"; + + /** Log file path. */ + private static final String LOG_DIR = U.getIgniteHome(); + + /** Endpoint address. */ + private static final String ENDPOINT = "localhost:10500"; + + /** Log file name. */ + private static final String LOG_FILE = LOG_DIR + File.separator + "igfs-log-" + IGFS_NAME + "-" + U.jvmPid() + + ".csv"; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + removeLogs(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + removeLogs(); + } + + /** + * Remove existing logs. + * + * @throws Exception If failed. + */ + private void removeLogs() throws Exception { + File dir = new File(LOG_DIR); + + File[] logs = dir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("igfs-log-"); + } + }); + + for (File log : logs) + log.delete(); + } + + /** + * Ensure correct static loggers creation/removal as well as file creation. + * + * @throws Exception If failed. + */ + public void testCreateDelete() throws Exception { + IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + IgfsLogger sameLog0 = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + // Loggers for the same endpoint must be the same object. + assert log == sameLog0; + + IgfsLogger otherLog = IgfsLogger.logger("other" + ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + // Logger for another endpoint must be different. + assert log != otherLog; + + otherLog.close(); + + log.logDelete(PATH, PRIMARY, false); + + log.close(); + + File logFile = new File(LOG_FILE); + + // When there are multiple loggers, closing one must not force flushing. + assert !logFile.exists(); + + IgfsLogger sameLog1 = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + assert sameLog0 == sameLog1; + + sameLog0.close(); + + assert !logFile.exists(); + + sameLog1.close(); + + // When we cloe the last logger, it must flush data to disk. + assert logFile.exists(); + + logFile.delete(); + + IgfsLogger sameLog2 = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + // This time we expect new logger instance to be created. + assert sameLog0 != sameLog2; + + sameLog2.close(); + + // As we do not add any records to the logger, we do not expect flushing. + assert !logFile.exists(); + } + + /** + * Test read operations logging. + * + * @throws Exception If failed. + */ + public void testLogRead() throws Exception { + IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + log.logOpen(1, PATH, PRIMARY, 2, 3L); + log.logRandomRead(1, 4L, 5); + log.logSeek(1, 6L); + log.logSkip(1, 7L); + log.logMark(1, 8L); + log.logReset(1); + log.logCloseIn(1, 9L, 10L, 11); + + log.close(); + + checkLog( + new SB().a(U.jvmPid() + d() + TYPE_OPEN_IN + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 1 + d() + 2 + + d() + 3 + d(14)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_RANDOM_READ + d(3) + 1 + d(7) + 4 + d() + 5 + d(8)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_SEEK + d(3) + 1 + d(7) + 6 + d(9)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_SKIP + d(3) + 1 + d(9) + 7 + d(7)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_MARK + d(3) + 1 + d(10) + 8 + d(6)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_RESET + d(3) + 1 + d(16)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_CLOSE_IN + d(3) + 1 + d(11) + 9 + d() + 10 + d() + 11 + d(3)).toString() + ); + } + + /** + * Test write operations logging. + * + * @throws Exception If failed. + */ + public void testLogWrite() throws Exception { + IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + log.logCreate(1, PATH, PRIMARY, true, 2, new Integer(3).shortValue(), 4L); + log.logAppend(2, PATH, PRIMARY, 8); + log.logCloseOut(2, 9L, 10L, 11); + + log.close(); + + checkLog( + new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 1 + d() + + 2 + d(2) + 0 + d() + 1 + d() + 3 + d() + 4 + d(10)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 2 + d() + + 8 + d(2) + 1 + d(13)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_CLOSE_OUT + d(3) + 2 + d(11) + 9 + d() + 10 + d() + 11 + d(3)) + .toString() + ); + } + + /** + * Test miscellaneous operations logging. + * + * @throws Exception If failed. + */ + @SuppressWarnings("TooBroadScope") + public void testLogMisc() throws Exception { + IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); + + String newFile = "/dir3/file.test"; + String file1 = "/dir3/file1.test"; + String file2 = "/dir3/file1.test"; + + log.logMakeDirectory(PATH, PRIMARY); + log.logRename(PATH, PRIMARY, new IgfsPath(newFile)); + log.logListDirectory(PATH, PRIMARY, new String[] { file1, file2 }); + log.logDelete(PATH, PRIMARY, false); + + log.close(); + + checkLog( + new SB().a(U.jvmPid() + d() + TYPE_DIR_MAKE + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(17)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_RENAME + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(15) + newFile + + d(2)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_DIR_LIST + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(17) + file1 + + DELIM_FIELD_VAL + file2).toString(), + new SB().a(U.jvmPid() + d() + TYPE_DELETE + d(1) + PATH_STR_ESCAPED + d() + PRIMARY + d(16) + 0 + + d()).toString() + ); + } + + /** + * Ensure that log file has only the following lines. + * + * @param lines Expected lines. + */ + private void checkLog(String... lines) throws Exception { + BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(LOG_FILE))); + + List<String> logLines = new ArrayList<>(lines.length); + + String nextLogLine; + + while ((nextLogLine = br.readLine()) != null) + logLines.add(nextLogLine); + + U.closeQuiet(br); + + assertEquals(lines.length + 1, logLines.size()); + + assertEquals(logLines.get(0), HDR); + + for (int i = 0; i < lines.length; i++) { + String logLine = logLines.get(i + 1); + + logLine = logLine.substring(logLine.indexOf(DELIM_FIELD, logLine.indexOf(DELIM_FIELD) + 1) + 1); + + assertEquals(lines[i], logLine); + } + } + + /** + * Return single field delimiter. + * + * @return Single field delimiter. + */ + private String d() { + return d(1); + } + + /** + * Return a bunch of field delimiters. + * + * @param cnt Amount of field delimiters. + * @return Field delimiters. + */ + private String d(int cnt) { + SB buf = new SB(); + + for (int i = 0; i < cnt; i++) + buf.a(DELIM_FIELD); + + return buf.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java new file mode 100644 index 0000000..1bd5b41 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.lang.reflect.Field; +import java.net.URI; +import java.nio.file.Paths; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED; + +/** + * Ensures that sampling is really turned on/off. + */ +public class IgniteHadoopFileSystemLoggerStateSelfTest extends IgfsCommonAbstractTest { + /** IGFS. */ + private IgfsEx igfs; + + /** File system. */ + private FileSystem fs; + + /** Whether logging is enabled in FS configuration. */ + private boolean logging; + + /** whether sampling is enabled. */ + private Boolean sampling; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.closeQuiet(fs); + + igfs = null; + fs = null; + + G.stopAll(true); + + logging = false; + sampling = null; + } + + /** + * Startup the grid and instantiate the file system. + * + * @throws Exception If failed. + */ + private void startUp() throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs"); + igfsCfg.setBlockSize(512 * 1024); + igfsCfg.setDefaultMode(PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(10500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("igfs-grid"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + Ignite g = G.start(cfg); + + igfs = (IgfsEx)g.fileSystem("igfs"); + + igfs.globalSampling(sampling); + + fs = fileSystem(); + } + + /** + * When logging is disabled and sampling is not set no-op logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingDisabledSamplingNotSet() throws Exception { + startUp(); + + assert !logEnabled(); + } + + /** + * When logging is enabled and sampling is not set file logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingEnabledSamplingNotSet() throws Exception { + logging = true; + + startUp(); + + assert logEnabled(); + } + + /** + * When logging is disabled and sampling is disabled no-op logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingDisabledSamplingDisabled() throws Exception { + sampling = false; + + startUp(); + + assert !logEnabled(); + } + + /** + * When logging is enabled and sampling is disabled no-op logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingEnabledSamplingDisabled() throws Exception { + logging = true; + sampling = false; + + startUp(); + + assert !logEnabled(); + } + + /** + * When logging is disabled and sampling is enabled file logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingDisabledSamplingEnabled() throws Exception { + sampling = true; + + startUp(); + + assert logEnabled(); + } + + /** + * When logging is enabled and sampling is enabled file logger must be used. + * + * @throws Exception If failed. + */ + public void testLoggingEnabledSamplingEnabled() throws Exception { + logging = true; + sampling = true; + + startUp(); + + assert logEnabled(); + } + + /** + * Ensure sampling change through API causes changes in logging on subsequent client connections. + * + * @throws Exception If failed. + */ + public void testSamplingChange() throws Exception { + // Start with sampling not set. + startUp(); + + assert !logEnabled(); + + fs.close(); + + // "Not set" => true transition. + igfs.globalSampling(true); + + fs = fileSystem(); + + assert logEnabled(); + + fs.close(); + + // True => "not set" transition. + igfs.globalSampling(null); + + fs = fileSystem(); + + assert !logEnabled(); + + // "Not-set" => false transition. + igfs.globalSampling(false); + + fs = fileSystem(); + + assert !logEnabled(); + + fs.close(); + + // False => "not=set" transition. + igfs.globalSampling(null); + + fs = fileSystem(); + + assert !logEnabled(); + + fs.close(); + + // True => false transition. + igfs.globalSampling(true); + igfs.globalSampling(false); + + fs = fileSystem(); + + assert !logEnabled(); + + fs.close(); + + // False => true transition. + igfs.globalSampling(true); + + fs = fileSystem(); + + assert logEnabled(); + } + + /** + * Ensure that log directory is set to IGFS when client FS connects. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + public void testLogDirectory() throws Exception { + startUp(); + + assertEquals(Paths.get(U.getIgniteHome()).normalize().toString(), + igfs.clientLogDirectory()); + } + + /** + * Instantiate new file system. + * + * @return New file system. + * @throws Exception If failed. + */ + private IgniteHadoopFileSystem fileSystem() throws Exception { + Configuration fsCfg = new Configuration(); + + fsCfg.addResource(U.resolveIgniteUrl("modules/core/src/test/config/hadoop/core-site-loopback.xml")); + + fsCfg.setBoolean("fs.igfs.impl.disable.cache", true); + + if (logging) + fsCfg.setBoolean(String.format(PARAM_IGFS_LOG_ENABLED, "igfs:igfs-grid@"), logging); + + fsCfg.setStrings(String.format(PARAM_IGFS_LOG_DIR, "igfs:igfs-grid@"), U.getIgniteHome()); + + return (IgniteHadoopFileSystem)FileSystem.get(new URI("igfs://igfs:igfs-grid@/"), fsCfg); + } + + /** + * Ensure that real logger is used by the file system. + * + * @return {@code True} in case path is secondary. + * @throws Exception If failed. + */ + private boolean logEnabled() throws Exception { + assert fs != null; + + Field field = fs.getClass().getDeclaredField("clientLog"); + + field.setAccessible(true); + + return ((IgfsLogger)field.get(fs)).isLogEnabled(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackAbstractSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackAbstractSelfTest.java new file mode 100644 index 0000000..6ed2249 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackAbstractSelfTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT; + +/** + * IGFS Hadoop file system IPC loopback self test. + */ +public abstract class IgniteHadoopFileSystemLoopbackAbstractSelfTest extends + IgniteHadoopFileSystemAbstractSelfTest { + /** + * Constructor. + * + * @param mode IGFS mode. + * @param skipEmbed Skip embedded mode flag. + */ + protected IgniteHadoopFileSystemLoopbackAbstractSelfTest(IgfsMode mode, boolean skipEmbed) { + super(mode, skipEmbed, true); + } + + /** {@inheritDoc} */ + @Override protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); + + return endpointCfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.java new file mode 100644 index 0000000..f1edb28 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * IGFS Hadoop file system IPC loopback self test in DUAL_ASYNC mode. + */ +public class IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest extends + IgniteHadoopFileSystemLoopbackAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest() { + super(DUAL_ASYNC, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.java new file mode 100644 index 0000000..97a6991 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * IGFS Hadoop file system IPC loopback self test in DUAL_SYNC mode. + */ +public class IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest + extends IgniteHadoopFileSystemLoopbackAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest() { + super(DUAL_SYNC, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.java new file mode 100644 index 0000000..f9ecc4b --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * IGFS Hadoop file system IPC loopback self test in PRIMARY mode. + */ +public class IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest + extends IgniteHadoopFileSystemLoopbackAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest() { + super(PRIMARY, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.java new file mode 100644 index 0000000..719df6d --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PROXY; + +/** + * IGFS Hadoop file system IPC loopback self test in SECONDARY mode. + */ +public class IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest extends + IgniteHadoopFileSystemLoopbackAbstractSelfTest { + + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest() { + super(PROXY, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest.java new file mode 100644 index 0000000..764624d --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * IGFS Hadoop file system IPC loopback self test in DUAL_ASYNC mode. + */ +public class IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest extends + IgniteHadoopFileSystemLoopbackAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest() { + super(DUAL_ASYNC, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.java new file mode 100644 index 0000000..21a248a --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * IGFS Hadoop file system IPC loopback self test in DUAL_SYNC mode. + */ +public class IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest + extends IgniteHadoopFileSystemLoopbackAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest() { + super(DUAL_SYNC, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.java new file mode 100644 index 0000000..092c7a5 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * IGFS Hadoop file system IPC loopback self test in PRIMARY mode. + */ +public class IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest + extends IgniteHadoopFileSystemLoopbackAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest() { + super(PRIMARY, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.java new file mode 100644 index 0000000..9f7d21b --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PROXY; + +/** + * IGFS Hadoop file system IPC loopback self test in SECONDARY mode. + */ +public class IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest extends + IgniteHadoopFileSystemLoopbackAbstractSelfTest { + + /** + * Constructor. + */ + public IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest() { + super(PROXY, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java new file mode 100644 index 0000000..1b48870 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +import java.net.URI; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * Ensures correct modes resolution for SECONDARY paths. + */ +public class IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest extends IgfsCommonAbstractTest { + /** File system. */ + private IgniteHadoopFileSystem fs; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.closeQuiet(fs); + + fs = null; + + G.stopAll(true); + } + + /** + * Perform initial startup. + * + * @param initDfltPathModes WHether to initialize default path modes. + * @throws Exception If failed. + */ + @SuppressWarnings({"NullableProblems", "unchecked"}) + private void startUp(boolean initDfltPathModes) throws Exception { + startUpSecondary(); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs"); + igfsCfg.setBlockSize(512 * 1024); + igfsCfg.setInitializeDefaultPathModes(initDfltPathModes); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(10500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + igfsCfg.setManagementPort(-1); + igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( + "igfs://igfs-secondary:igfs-grid-secondary@127.0.0.1:11500/", + "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("igfs-grid"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + + G.start(cfg); + + Configuration fsCfg = new Configuration(); + + fsCfg.addResource(U.resolveIgniteUrl("modules/core/src/test/config/hadoop/core-site-loopback.xml")); + + fsCfg.setBoolean("fs.igfs.impl.disable.cache", true); + + fs = (IgniteHadoopFileSystem)FileSystem.get(new URI("igfs://igfs:igfs-grid@/"), fsCfg); + } + + /** + * Startup secondary file system. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void startUpSecondary() throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs-secondary"); + igfsCfg.setBlockSize(512 * 1024); + igfsCfg.setDefaultMode(PRIMARY); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(11500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + cacheCfg.setBackups(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("igfs-grid-secondary"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + + G.start(cfg); + } + + /** + * Test scenario when defaults are initialized. + * + * @throws Exception If failed. + */ + public void testDefaultsInitialized() throws Exception { + check(true); + } + + /** + * Test scenario when defaults are not initialized. + * + * @throws Exception If failed. + */ + public void testDefaultsNotInitialized() throws Exception { + check(false); + } + + /** + * Actual check. + * + * @param initDfltPathModes Whether to initialize default path modes. + * @throws Exception If failed. + */ + private void check(boolean initDfltPathModes) throws Exception { + startUp(initDfltPathModes); + + assertEquals(initDfltPathModes, fs.hasSecondaryFileSystem()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java new file mode 100644 index 0000000..d8cf74c --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.ipc.IpcEndpoint; +import org.apache.ignite.internal.util.ipc.IpcEndpointFactory; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT; + +/** + * IGFS Hadoop file system IPC self test. + */ +public abstract class IgniteHadoopFileSystemShmemAbstractSelfTest extends IgniteHadoopFileSystemAbstractSelfTest { + /** + * Constructor. + * + * @param mode IGFS mode. + * @param skipEmbed Skip embedded mode flag. + */ + protected IgniteHadoopFileSystemShmemAbstractSelfTest(IgfsMode mode, boolean skipEmbed) { + super(mode, skipEmbed, false); + } + + /** {@inheritDoc} */ + @Override protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); + + return endpointCfg; + } + + /** + * Checks correct behaviour in case when we run out of system + * resources. + * + * @throws Exception If error occurred. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testOutOfResources() throws Exception { + final Collection<IpcEndpoint> eps = new LinkedList<>(); + + try { + IgniteCheckedException e = (IgniteCheckedException)GridTestUtils.assertThrows(log, new Callable<Object>() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public Object call() throws Exception { + while (true) { + IpcEndpoint ep = IpcEndpointFactory.connectEndpoint("shmem:10500", log); + + eps.add(ep); + } + } + }, IgniteCheckedException.class, null); + + assertNotNull(e); + + String msg = e.getMessage(); + + assertTrue("Invalid exception: " + X.getFullStackTrace(e), + msg.contains("(error code: 28)") || + msg.contains("(error code: 24)") || + msg.contains("(error code: 12)")); + } + finally { + for (IpcEndpoint ep : eps) + ep.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java new file mode 100644 index 0000000..d0d570f --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest() { + super(DUAL_ASYNC, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java new file mode 100644 index 0000000..2e5b015 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest() { + super(DUAL_SYNC, false); + } +} \ No newline at end of file