http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java new file mode 100644 index 0000000..214c2a8 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * IGFS Hadoop file system IPC shmem self test in PRIMARY mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest() { + super(PRIMARY, false); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java new file mode 100644 index 0000000..d7f34a1 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PROXY; + +/** + * IGFS Hadoop file system IPC shmem self test in SECONDARY mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest() { + super(PROXY, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java new file mode 100644 index 0000000..0435eaa --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode. + */ +public class IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest() { + super(DUAL_ASYNC, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java new file mode 100644 index 0000000..3af7274 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode. + */ +public class IgniteHadoopFileSystemShmemExternalDualSyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalDualSyncSelfTest() { + super(DUAL_SYNC, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java new file mode 100644 index 0000000..ce9dbd9 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * IGFS Hadoop file system IPC shmem self test in PRIMARY mode. + */ +public class IgniteHadoopFileSystemShmemExternalPrimarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalPrimarySelfTest() { + super(PRIMARY, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java new file mode 100644 index 0000000..bc8c182 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PROXY; + +/** + * IGFS Hadoop file system IPC shmem self test in SECONDARY mode. + */ +public class IgniteHadoopFileSystemShmemExternalSecondarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalSecondarySelfTest() { + super(PROXY, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java new file mode 100644 index 0000000..3731213 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; +import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; + +/** + * Abstract test of whole cycle of map-reduce processing via Job tracker. + */ +public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** The user to run Hadoop job on behalf of. */ + protected static final String USER = "vasya"; + + /** Secondary IGFS name. */ + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** Red constant. */ + protected static final int red = 10_000; + + /** Blue constant. */ + protected static final int blue = 20_000; + + /** Green constant. */ + protected static final int green = 15_000; + + /** Yellow constant. */ + protected static final int yellow = 7_000; + + /** The secondary Ignite node. */ + protected Ignite igniteSecondary; + + /** The secondary Fs. */ + protected IgfsSecondaryFileSystem secondaryFs; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * Gets owner of a IgfsEx path. + * @param p The path. + * @return The owner. + */ + private static String getOwner(final IgfsEx i, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + IgfsFile f = i.info(p); + + assert f != null; + + return f.property(IgfsUtils.PROP_USER_NAME); + } + }); + } + + /** + * Gets owner of a secondary Fs path. + * @param secFs The sec Fs. + * @param p The path. + * @return The owner. + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsUtils.PROP_USER_NAME); + } + }); + } + + /** + * Checks owner of the path. + * @param p The path. + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + + /** + * Does actual test job + * + * @param useNewMapper flag to use new mapper API. + * @param useNewCombiner flag to use new combiner API. + * @param useNewReducer flag to use new reducer API. + */ + protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer) + throws Exception { + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + JobConf jobConf = new JobConf(); + + jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); + jobConf.setUser(USER); + jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); + + //To split into about 40 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setJarByClass(HadoopWordCount2.class); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + checkJobStatistics(jobId); + + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + + String actual = readAndSortFile(outFile, job.getConfiguration()); + + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + + useNewReducer, + "blue\t" + blue + "\n" + + "green\t" + green + "\n" + + "red\t" + red + "\n" + + "yellow\t" + yellow + "\n", + actual + ); + } + + /** + * Gets if to compress output data with Snappy. + * + * @return If to compress output data with Snappy. + */ + protected boolean compressOutputSnappy() { + return false; + } + + /** + * Simple test job statistics. + * + * @param jobId Job id. + * @throws IgniteCheckedException + */ + private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { + HadoopCounters cntrs = grid(0).hadoop().counters(jobId); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); + + Map<String, Integer> phaseOrders = new HashMap<>(); + phaseOrders.put("submit", 0); + phaseOrders.put("prepare", 1); + phaseOrders.put("start", 2); + phaseOrders.put("Cstart", 3); + phaseOrders.put("finish", 4); + + String prevTaskId = null; + + long apiEvtCnt = 0; + + for (T2<String, Long> evt : perfCntr.evts()) { + //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 + String[] parsedEvt = evt.get1().split(" "); + + String taskId; + String taskPhase; + + if ("JOB".equals(parsedEvt[0])) { + taskId = parsedEvt[0]; + taskPhase = parsedEvt[1]; + } + else { + taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; + taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; + } + + if (!taskId.equals(prevTaskId)) + tasks.put(taskId, new TreeMap<Integer,Long>()); + + Integer pos = phaseOrders.get(taskPhase); + + assertNotNull("Invalid phase " + taskPhase, pos); + + tasks.get(taskId).put(pos, evt.get2()); + + prevTaskId = taskId; + + apiEvtCnt++; + } + + for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { + Map<Integer, Long> order = task.getValue(); + + long prev = 0; + + for (Map.Entry<Integer, Long> phase : order.entrySet()) { + assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); + + prev = phase.getValue(); + } + } + + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return igfs.exists(statPath); + } + }, 20_000); + + final long apiEvtCnt0 = apiEvtCnt; + + boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) { + return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }, 10000); + + if (!res) { + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); + + assert false : "Invalid API events count [exp=" + apiEvtCnt0 + + ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + + super.beforeTest(); + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + HadoopConfiguration hadoopCfg = createHadoopConfiguration(); + + if (hadoopCfg != null) + cfg.setHadoopConfiguration(hadoopCfg); + + return G.start(cfg); + } + + /** + * Creates custom Hadoop configuration. + * + * @return The Hadoop configuration. + */ + protected HadoopConfiguration createHadoopConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration fsCfg = super.igfsConfiguration(); + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); + + return fsCfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java new file mode 100644 index 0000000..fb16988 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.igfs.IgfsIpcEndpointType; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Abstract class for Hadoop tests. + */ +public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** REST port. */ + protected static final int REST_PORT = 11212; + + /** IGFS name. */ + protected static final String igfsName = null; + + /** IGFS name. */ + protected static final String igfsMetaCacheName = "meta"; + + /** IGFS name. */ + protected static final String igfsDataCacheName = "data"; + + /** IGFS block size. */ + protected static final int igfsBlockSize = 1024; + + /** IGFS block group size. */ + protected static final int igfsBlockGroupSize = 8; + + /** Initial REST port. */ + private int restPort = REST_PORT; + + /** Secondary file system REST endpoint configuration. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; + + static { + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + + + /** Initial classpath. */ + private static String initCp; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // Add surefire classpath to regular classpath. + initCp = System.getProperty("java.class.path"); + + String surefireCp = System.getProperty("surefire.test.class.path"); + + if (surefireCp != null) + System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + // Restore classpath. + System.setProperty("java.class.path", initCp); + + initCp = null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + if (igfsEnabled()) { + cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); + + cfg.setFileSystemConfiguration(igfsConfiguration()); + } + + if (restEnabled()) { + ConnectorConfiguration clnCfg = new ConnectorConfiguration(); + + clnCfg.setPort(restPort++); + + cfg.setConnectorConfiguration(clnCfg); + } + + cfg.setLocalHost("127.0.0.1"); + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Hadoop configuration. + */ + public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = new HadoopConfiguration(); + + cfg.setMaxParallelTasks(3); + + return cfg; + } + + /** + * @return IGFS configuration. + */ + public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration cfg = new FileSystemConfiguration(); + + cfg.setName(igfsName); + cfg.setBlockSize(igfsBlockSize); + cfg.setDataCacheName(igfsDataCacheName); + cfg.setMetaCacheName(igfsMetaCacheName); + cfg.setFragmentizerEnabled(false); + + return cfg; + } + + /** + * @return IGFS meta cache configuration. + */ + public CacheConfiguration metaCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(igfsMetaCacheName); + cfg.setCacheMode(REPLICATED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return IGFS data cache configuration. + */ + private CacheConfiguration dataCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(igfsDataCacheName); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize)); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return {@code True} if IGFS is enabled on Hadoop nodes. + */ + protected boolean igfsEnabled() { + return false; + } + + /** + * @return {@code True} if REST is enabled on Hadoop nodes. + */ + protected boolean restEnabled() { + return false; + } + + /** + * @return Number of nodes to start. + */ + protected int gridCount() { + return 3; + } + + /** + * @param cfg Config. + */ + protected void setupFileSystems(Configuration cfg) { + cfg.set("fs.defaultFS", igfsScheme()); + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem. + class.getName()); + + HadoopFileSystemsUtils.setupFileSystems(cfg); + } + + /** + * @return IGFS scheme for test. + */ + protected String igfsScheme() { + return "igfs://:" + getTestGridName(0) + "@/"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java new file mode 100644 index 0000000..e45c127 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.Joiner; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.igfs.IgfsEx; + +/** + * Abstract class for tests based on WordCount test job. + */ +public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest { + /** Input path. */ + protected static final String PATH_INPUT = "/input"; + + /** Output path. */ + protected static final String PATH_OUTPUT = "/output"; + + /** IGFS instance. */ + protected IgfsEx igfs; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + // Init cache by correct LocalFileSystem implementation + FileSystem.getLocal(cfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** + * Generates test file. + * + * @param path File name. + * @param wordCounts Words and counts. + * @throws Exception If failed. + */ + protected void generateTestFile(String path, Object... wordCounts) throws Exception { + List<String> wordsArr = new ArrayList<>(); + + //Generating + for (int i = 0; i < wordCounts.length; i += 2) { + String word = (String) wordCounts[i]; + int cnt = (Integer) wordCounts[i + 1]; + + while (cnt-- > 0) + wordsArr.add(word); + } + + //Shuffling + for (int i = 0; i < wordsArr.size(); i++) { + int j = (int)(Math.random() * wordsArr.size()); + + Collections.swap(wordsArr, i, j); + } + + //Input file preparing + PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true)); + + int j = 0; + + while (j < wordsArr.size()) { + int i = 5 + (int)(Math.random() * 5); + + List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); + j += i; + + testInputFileWriter.println(Joiner.on(' ').join(subList)); + } + + testInputFileWriter.close(); + } + + /** + * Read w/o decoding (default). + * + * @param fileName The file. + * @return The file contents, human-readable. + * @throws Exception On error. + */ + protected String readAndSortFile(String fileName) throws Exception { + return readAndSortFile(fileName, null); + } + + /** + * Reads whole text file into String. + * + * @param fileName Name of the file to read. + * @return Content of the file as String value. + * @throws Exception If could not read the file. + */ + protected String readAndSortFile(String fileName, Configuration conf) throws Exception { + final List<String> list = new ArrayList<>(); + + final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false); + + if (snappyDecode) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(new Path(fileName)))) { + Text key = new Text(); + + IntWritable val = new IntWritable(); + + while (reader.next(key, val)) + list.add(key + "\t" + val); + } + } + else { + try (InputStream is0 = igfs.open(new IgfsPath(fileName))) { + BufferedReader reader = new BufferedReader(new InputStreamReader(is0)); + + String line; + + while ((line = reader.readLine()) != null) + list.add(line); + } + } + + Collections.sort(list); + + return Joiner.on('\n').join(list) + "\n"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java new file mode 100644 index 0000000..e202f48 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import javax.security.auth.AuthPermission; +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.internal.processors.hadoop.deps.CircularWIthHadoop; +import org.apache.ignite.internal.processors.hadoop.deps.CircularWithoutHadoop; +import org.apache.ignite.internal.processors.hadoop.deps.WithIndirectField; +import org.apache.ignite.internal.processors.hadoop.deps.WithCast; +import org.apache.ignite.internal.processors.hadoop.deps.WithClassAnnotation; +import org.apache.ignite.internal.processors.hadoop.deps.WithConstructorInvocation; +import org.apache.ignite.internal.processors.hadoop.deps.WithMethodCheckedException; +import org.apache.ignite.internal.processors.hadoop.deps.WithMethodRuntimeException; +import org.apache.ignite.internal.processors.hadoop.deps.WithExtends; +import org.apache.ignite.internal.processors.hadoop.deps.WithField; +import org.apache.ignite.internal.processors.hadoop.deps.WithImplements; +import org.apache.ignite.internal.processors.hadoop.deps.WithInitializer; +import org.apache.ignite.internal.processors.hadoop.deps.WithInnerClass; +import org.apache.ignite.internal.processors.hadoop.deps.WithLocalVariable; +import org.apache.ignite.internal.processors.hadoop.deps.WithMethodAnnotation; +import org.apache.ignite.internal.processors.hadoop.deps.WithMethodInvocation; +import org.apache.ignite.internal.processors.hadoop.deps.WithMethodArgument; +import org.apache.ignite.internal.processors.hadoop.deps.WithMethodReturnType; +import org.apache.ignite.internal.processors.hadoop.deps.WithOuterClass; +import org.apache.ignite.internal.processors.hadoop.deps.WithParameterAnnotation; +import org.apache.ignite.internal.processors.hadoop.deps.WithStaticField; +import org.apache.ignite.internal.processors.hadoop.deps.WithStaticInitializer; +import org.apache.ignite.internal.processors.hadoop.deps.Without; + +/** + * Tests for Hadoop classloader. + */ +public class HadoopClassLoaderTest extends TestCase { + /** */ + final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null, new HadoopHelperImpl()); + + /** + * @throws Exception If failed. + */ + public void testClassLoading() throws Exception { + assertNotSame(CircularWIthHadoop.class, ldr.loadClass(CircularWIthHadoop.class.getName())); + assertNotSame(CircularWithoutHadoop.class, ldr.loadClass(CircularWithoutHadoop.class.getName())); + + assertSame(Without.class, ldr.loadClass(Without.class.getName())); + } + + /** + * Test dependency search. + */ + public void testDependencySearch() { + // Positive cases: + final Class[] positiveClasses = { + Configuration.class, + HadoopUtils.class, + WithStaticField.class, + WithCast.class, + WithClassAnnotation.class, + WithConstructorInvocation.class, + WithMethodCheckedException.class, + WithMethodRuntimeException.class, + WithExtends.class, + WithField.class, + WithImplements.class, + WithInitializer.class, + WithInnerClass.class, + WithOuterClass.InnerNoHadoop.class, + WithLocalVariable.class, + WithMethodAnnotation.class, + WithMethodInvocation.class, + WithMethodArgument.class, + WithMethodReturnType.class, + WithParameterAnnotation.class, + WithStaticField.class, + WithStaticInitializer.class, + WithIndirectField.class, + CircularWIthHadoop.class, + CircularWithoutHadoop.class, + }; + + for (Class c : positiveClasses) + assertTrue(c.getName(), ldr.hasExternalDependencies(c.getName())); + + // Negative cases: + final Class[] negativeClasses = { + Object.class, + AuthPermission.class, + Without.class, + }; + + for (Class c : negativeClasses) + assertFalse(c.getName(), ldr.hasExternalDependencies(c.getName())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java new file mode 100644 index 0000000..7ee318a --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.Joiner; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileFilter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.igfs.IgfsInputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ConcurrentHashMap8; + +/** + * Test of integration with Hadoop client via command line interface. + */ +public class HadoopCommandLineTest extends GridCommonAbstractTest { + /** IGFS instance. */ + private IgfsEx igfs; + + /** */ + private static final String igfsName = "igfs"; + + /** */ + private static File testWorkDir; + + /** */ + private static String hadoopHome; + + /** */ + private static String hiveHome; + + /** */ + private static File examplesJar; + + /** + * + * @param path File name. + * @param wordCounts Words and counts. + * @throws Exception If failed. + */ + private void generateTestFile(File path, Object... wordCounts) throws Exception { + List<String> wordsArr = new ArrayList<>(); + + //Generating + for (int i = 0; i < wordCounts.length; i += 2) { + String word = (String) wordCounts[i]; + int cnt = (Integer) wordCounts[i + 1]; + + while (cnt-- > 0) + wordsArr.add(word); + } + + //Shuffling + for (int i = 0; i < wordsArr.size(); i++) { + int j = (int)(Math.random() * wordsArr.size()); + + Collections.swap(wordsArr, i, j); + } + + //Writing file + try (PrintWriter writer = new PrintWriter(path)) { + int j = 0; + + while (j < wordsArr.size()) { + int i = 5 + (int)(Math.random() * 5); + + List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); + j += i; + + writer.println(Joiner.on(' ').join(subList)); + } + + writer.flush(); + } + } + + /** + * Generates two data files to join its with Hive. + * + * @throws FileNotFoundException If failed. + */ + private void generateHiveTestFiles() throws FileNotFoundException { + try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a")); + PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) { + char sep = '\t'; + + int idB = 0; + int idA = 0; + int v = 1000; + + for (int i = 0; i < 1000; i++) { + writerA.print(idA++); + writerA.print(sep); + writerA.println(idB); + + writerB.print(idB++); + writerB.print(sep); + writerB.println(v += 2); + + writerB.print(idB++); + writerB.print(sep); + writerB.println(v += 2); + } + + writerA.flush(); + writerB.flush(); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + hiveHome = IgniteSystemProperties.getString("HIVE_HOME"); + + assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome)); + + hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME"); + + assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome)); + + String mapredHome = hadoopHome + "/share/hadoop/mapreduce"; + + File[] fileList = new File(mapredHome).listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().startsWith("hadoop-mapreduce-examples-") && + pathname.getName().endsWith(".jar"); + } + }); + + assertEquals("Invalid hadoop distribution.", 1, fileList.length); + + examplesJar = fileList[0]; + + testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile(); + + U.copy(resolveHadoopConfig("core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false); + + File srcFile = resolveHadoopConfig("mapred-site.ignite.xml"); + File dstFile = new File(testWorkDir, "mapred-site.xml"); + + try (BufferedReader in = new BufferedReader(new FileReader(srcFile)); + PrintWriter out = new PrintWriter(dstFile)) { + String line; + + while ((line = in.readLine()) != null) { + if (line.startsWith("</configuration>")) + out.println( + " <property>\n" + + " <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" + + " <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" + + " </property>\n"); + + out.println(line); + } + + out.flush(); + } + + generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50); + + generateHiveTestFiles(); + } + + /** + * Resolve Hadoop configuration file. + * + * @param name File name. + * @return Resolve file. + */ + private static File resolveHadoopConfig(String name) { + File path = U.resolveIgnitePath("modules/hadoop/config/" + name); + + return path != null ? path : U.resolveIgnitePath("config/hadoop/" + name); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + U.delete(testWorkDir); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + String cfgPath = "config/hadoop/default-config.xml"; + + IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath); + + IgniteConfiguration cfg = tup.get1(); + + cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes. + + igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * Creates the process build with appropriate environment to run Hadoop CLI. + * + * @return Process builder. + */ + private ProcessBuilder createProcessBuilder() { + String sep = ":"; + + String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + + HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + + ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + + ProcessBuilder res = new ProcessBuilder(); + + res.environment().put("HADOOP_HOME", hadoopHome); + res.environment().put("HADOOP_CLASSPATH", ggClsPath); + res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath()); + + res.redirectErrorStream(true); + + return res; + } + + /** + * Waits for process exit and prints the its output. + * + * @param proc Process. + * @return Exit code. + * @throws Exception If failed. + */ + private int watchProcess(Process proc) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); + + String line; + + while ((line = reader.readLine()) != null) + log().info(line); + + return proc.waitFor(); + } + + /** + * Executes Hadoop command line tool. + * + * @param args Arguments for Hadoop command line tool. + * @return Process exit code. + * @throws Exception If failed. + */ + private int executeHadoopCmd(String... args) throws Exception { + ProcessBuilder procBuilder = createProcessBuilder(); + + List<String> cmd = new ArrayList<>(); + + cmd.add(hadoopHome + "/bin/hadoop"); + cmd.addAll(Arrays.asList(args)); + + procBuilder.command(cmd); + + log().info("Execute: " + procBuilder.command()); + + return watchProcess(procBuilder.start()); + } + + /** + * Executes Hive query. + * + * @param qry Query. + * @return Process exit code. + * @throws Exception If failed. + */ + private int executeHiveQuery(String qry) throws Exception { + ProcessBuilder procBuilder = createProcessBuilder(); + + List<String> cmd = new ArrayList<>(); + + procBuilder.command(cmd); + + cmd.add(hiveHome + "/bin/hive"); + + cmd.add("--hiveconf"); + cmd.add("hive.rpc.query.plan=true"); + + cmd.add("--hiveconf"); + cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" + + "databaseName=metastore_db;create=true"); + + cmd.add("-e"); + cmd.add(qry); + + procBuilder.command(cmd); + + log().info("Execute: " + procBuilder.command()); + + return watchProcess(procBuilder.start()); + } + + /** + * Tests Hadoop command line integration. + */ + public void testHadoopCommandLine() throws Exception { + assertEquals(0, executeHadoopCmd("fs", "-ls", "/")); + + assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input")); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input")); + + assertTrue(igfs.exists(new IgfsPath("/input/test-data"))); + + assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); + + IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/"); + + assertTrue(igfs.exists(path)); + + IgfsPath jobStatPath = null; + + for (IgfsPath jobPath : igfs.listPaths(path)) { + assertNull(jobStatPath); + + jobStatPath = jobPath; + } + + File locStatFile = new File(testWorkDir, "performance"); + + assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); + + long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); + + assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. + + assertTrue(igfs.exists(new IgfsPath("/output"))); + + BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000")))); + + List<String> res = new ArrayList<>(); + + String line; + + while ((line = in.readLine()) != null) + res.add(line); + + Collections.sort(res); + + assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString()); + } + + /** + * Runs query check result. + * + * @param expRes Expected result. + * @param qry Query. + * @throws Exception If failed. + */ + private void checkQuery(String expRes, String qry) throws Exception { + assertEquals(0, executeHiveQuery("drop table if exists result")); + + assertEquals(0, executeHiveQuery( + "create table result " + + "row format delimited fields terminated by ' ' " + + "stored as textfile " + + "location '/result' as " + qry + )); + + IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0")); + + byte[] buf = new byte[(int) in.length()]; + + in.read(buf); + + assertEquals(expRes, new String(buf)); + } + + /** + * Tests Hive integration. + */ + public void testHiveCommandLine() throws Exception { + assertEquals(0, executeHiveQuery( + "create table table_a (" + + "id_a int," + + "id_b int" + + ") " + + "row format delimited fields terminated by '\\t'" + + "stored as textfile " + + "location '/table-a'" + )); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a")); + + assertEquals(0, executeHiveQuery( + "create table table_b (" + + "id_b int," + + "rndv int" + + ") " + + "row format delimited fields terminated by '\\t'" + + "stored as textfile " + + "location '/table-b'" + )); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b")); + + checkQuery( + "0 0\n" + + "1 2\n" + + "2 4\n" + + "3 6\n" + + "4 8\n" + + "5 10\n" + + "6 12\n" + + "7 14\n" + + "8 16\n" + + "9 18\n", + "select * from table_a order by id_a limit 10" + ); + + checkQuery("2000\n", "select count(id_b) from table_b"); + + checkQuery( + "250 500 2002\n" + + "251 502 2006\n" + + "252 504 2010\n" + + "253 506 2014\n" + + "254 508 2018\n" + + "255 510 2022\n" + + "256 512 2026\n" + + "257 514 2030\n" + + "258 516 2034\n" + + "259 518 2038\n", + "select a.id_a, a.id_b, b.rndv" + + " from table_a a" + + " inner join table_b b on a.id_b = b.id_b" + + " where b.rndv > 2000" + + " order by a.id_a limit 10" + ); + + checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b"); + } +} \ No newline at end of file