http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java new file mode 100644 index 0000000..f542cf2 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Context for test purpose. + */ +class HadoopTestTaskContext extends HadoopV2TaskContext { + /** + * Simple key-vale pair. + * @param <K> Key class. + * @param <V> Value class. + */ + public static class Pair<K,V> { + /** Key */ + private K key; + + /** Value */ + private V val; + + /** + * @param key key. + * @param val value. + */ + Pair(K key, V val) { + this.key = key; + this.val = val; + } + + /** + * Getter of key. + * @return key. + */ + K key() { + return key; + } + + /** + * Getter of value. + * @return value. + */ + V value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return key + "," + val; + } + } + + /** Mock output container- result data of task execution if it is not overridden. */ + private List<Pair<String, Integer>> mockOutput = new ArrayList<>(); + + /** Mock input container- input data if it is not overridden. */ + private Map<Object,List> mockInput = new TreeMap<>(); + + /** Context output implementation to write data into mockOutput. */ + private HadoopTaskOutput output = new HadoopTaskOutput() { + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) { + //Check of casting and extract/copy values + String strKey = new String(((Text)key).getBytes()); + int intVal = ((IntWritable)val).get(); + + mockOutput().add(new Pair<>(strKey, intVal)); + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new UnsupportedOperationException(); + } + }; + + /** Context input implementation to read data from mockInput. */ + private HadoopTaskInput input = new HadoopTaskInput() { + /** Iterator of keys and associated lists of values. */ + Iterator<Map.Entry<Object, List>> iter; + + /** Current key and associated value list. */ + Map.Entry<Object, List> currEntry; + + /** {@inheritDoc} */ + @Override public boolean next() { + if (iter == null) + iter = mockInput().entrySet().iterator(); + + if (iter.hasNext()) + currEntry = iter.next(); + else + currEntry = null; + + return currEntry != null; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return currEntry.getKey(); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return currEntry.getValue().iterator() ; + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new UnsupportedOperationException(); + } + }; + + /** + * Getter of mock output container - result of task if it is not overridden. + * + * @return mock output. + */ + public List<Pair<String, Integer>> mockOutput() { + return mockOutput; + } + + /** + * Getter of mock input container- input data if it is not overridden. + * + * @return mock output. + */ + public Map<Object, List> mockInput() { + return mockInput; + } + + /** + * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects. + * The result is placed into mock input. + * + * @param flatData list of key-value pair. + */ + public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) { + Text key = new Text(); + + for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) { + key.set(pair.key); + ArrayList<IntWritable> valList; + + if (!mockInput.containsKey(key)) { + valList = new ArrayList<>(); + mockInput.put(key, valList); + key = new Text(); + } + else + valList = (ArrayList<IntWritable>) mockInput.get(key); + valList.add(new IntWritable(pair.value())); + } + } + + /** + * @param taskInfo Task info. + * @param gridJob Grid Hadoop job. + */ + public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException { + super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); + } + + /** + * Creates DataInput to read JobConf. + * + * @param job Job. + * @return DataInput with JobConf. + * @throws IgniteCheckedException If failed. + */ + private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException { + JobConf jobConf = new JobConf(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + try { + jobConf.write(new DataOutputStream(buf)); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + } + + /** {@inheritDoc} */ + @Override public HadoopTaskOutput output() { + return output; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input() { + return input; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java new file mode 100644 index 0000000..da0d922 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Utility class for tests. + */ +public class HadoopTestUtils { + /** Base test directory. */ + private static final File BASE_TEST_DIR = new File(U.getIgniteHome() + "/work/test/hadoop/"); + + /** + * @return Base directory for tests. + */ + public static File baseTestDir() { + return BASE_TEST_DIR; + } + + /** + * Get test directory. + * + * @param parts Parts. + * @return Directory. + */ + public static File testDir(String... parts) { + File res = BASE_TEST_DIR; + + if (parts != null) { + for (String part : parts) + res = new File(res, part); + } + + return res; + } + + /** + * Clear base test directory. + */ + public static void clearBaseTestDir() { + if (baseTestDir().exists()) + assert delete(baseTestDir()); + } + + /** + * Checks that job statistics file contains valid strings only. + * + * @param reader Buffered reader to get lines of job statistics. + * @return Amount of events. + * @throws IOException If failed. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { + Collection<String> phases = new HashSet<>(); + + phases.add("submit"); + phases.add("prepare"); + phases.add("start"); + phases.add("finish"); + phases.add("requestId"); + phases.add("responseId"); + + Collection<String> evtTypes = new HashSet<>(); + + evtTypes.add("JOB"); + evtTypes.add("SETUP"); + evtTypes.add("MAP"); + evtTypes.add("SHUFFLE"); + evtTypes.add("REDUCE"); + evtTypes.add("COMBINE"); + evtTypes.add("COMMIT"); + + long evtCnt = 0; + String line; + + Map<Long, String> reduceNodes = new HashMap<>(); + + while((line = reader.readLine()) != null) { + String[] splitLine = line.split(":"); + + //Try parse timestamp + Long.parseLong(splitLine[1]); + + String[] evt = splitLine[0].split(" "); + + assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); + + String phase; + + if ("JOB".equals(evt[0])) + phase = evt[1]; + else { + assertEquals(4, evt.length); + assertTrue("The node id is not defined", !F.isEmpty(evt[3])); + + long taskNum = Long.parseLong(evt[1]); + + if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { + String nodeId = reduceNodes.get(taskNum); + + if (nodeId == null) + reduceNodes.put(taskNum, evt[3]); + else + assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); + } + + phase = evt[2]; + } + + assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); + + evtCnt++; + } + + return evtCnt; + } + + /** + * Deletes file or directory with all sub-directories and files. + * + * @param file File or directory to delete. + * @return {@code true} if and only if the file or directory is successfully deleted, + * {@code false} otherwise + */ + public static boolean delete(@Nullable File file) { + if (file == null) + return false; + + boolean res = true; + + if (file.isDirectory()) { + File[] files = file.listFiles(); + + if (files != null && files.length > 0) + for (File file1 : files) + if (file1.isDirectory()) + res &= delete(file1); + else + res &= file1.delete(); + + res &= file.delete(); + } + else + res = file.delete(); + + return res; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java new file mode 100644 index 0000000..9e3c8f4 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +/** + * Tests for user libs parsing. + */ +public class HadoopUserLibsSelfTest extends GridCommonAbstractTest { + /** Directory 1. */ + private static final File DIR_1 = HadoopTestUtils.testDir("dir1"); + + /** File 1 in directory 1. */ + private static final File FILE_1_1 = new File(DIR_1, "file1.jar"); + + /** File 2 in directory 1. */ + private static final File FILE_1_2 = new File(DIR_1, "file2.jar"); + + /** Directory 2. */ + private static final File DIR_2 = HadoopTestUtils.testDir("dir2"); + + /** File 1 in directory 2. */ + private static final File FILE_2_1 = new File(DIR_2, "file1.jar"); + + /** File 2 in directory 2. */ + private static final File FILE_2_2 = new File(DIR_2, "file2.jar"); + + /** Missing directory. */ + private static final File MISSING_DIR = HadoopTestUtils.testDir("missing_dir"); + + /** Missing file. */ + private static final File MISSING_FILE = new File(MISSING_DIR, "file.jar"); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + HadoopTestUtils.clearBaseTestDir(); + + assert DIR_1.mkdirs(); + assert DIR_2.mkdirs(); + + assert FILE_1_1.createNewFile(); + assert FILE_1_2.createNewFile(); + assert FILE_2_1.createNewFile(); + assert FILE_2_2.createNewFile(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + // Sanity checks before test start. + ensureExists(FILE_1_1); + ensureExists(FILE_1_2); + ensureExists(FILE_2_1); + ensureExists(FILE_2_2); + + ensureNotExists(MISSING_DIR); + ensureNotExists(MISSING_FILE); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + HadoopTestUtils.clearBaseTestDir(); + } + + /** + * Test null or empty user libs. + * + * @throws Exception If failed. + */ + public void testNullOrEmptyUserLibs() throws Exception { + assert parse(null).isEmpty(); + assert parse("").isEmpty(); + } + + /** + * Test single file. + * + * @throws Exception If failed. + */ + public void testSingle() throws Exception { + Collection<File> res = parse(single(FILE_1_1)); + + assert res.size() == 1; + assert res.contains(FILE_1_1); + + res = parse(single(MISSING_FILE)); + + assert res.size() == 0; + } + + /** + * Test multiple files. + * + * @throws Exception If failed. + */ + public void testMultiple() throws Exception { + Collection<File> res = + parse(merge(single(FILE_1_1), single(FILE_1_2), single(FILE_2_1), single(FILE_2_2), single(MISSING_FILE))); + + assert res.size() == 4; + assert res.contains(FILE_1_1); + assert res.contains(FILE_1_2); + assert res.contains(FILE_2_1); + assert res.contains(FILE_2_2); + } + + /** + * Test single wildcard. + * + * @throws Exception If failed. + */ + public void testSingleWildcard() throws Exception { + Collection<File> res = parse(wildcard(DIR_1)); + + assert res.size() == 2; + assert res.contains(FILE_1_1); + assert res.contains(FILE_1_2); + + res = parse(wildcard(MISSING_DIR)); + + assert res.size() == 0; + } + + /** + * Test multiple wildcards. + * + * @throws Exception If failed. + */ + public void testMultipleWildcards() throws Exception { + Collection<File> res = parse(merge(wildcard(DIR_1), wildcard(DIR_2), wildcard(MISSING_DIR))); + + assert res.size() == 4; + assert res.contains(FILE_1_1); + assert res.contains(FILE_1_2); + assert res.contains(FILE_2_1); + assert res.contains(FILE_2_2); + } + + /** + * Test mixed tokens. + * + * @throws Exception If failed. + */ + public void testMixed() throws Exception { + String str = merge( + single(FILE_1_1), + wildcard(DIR_2), + single(MISSING_FILE), + wildcard(MISSING_DIR) + ); + + Collection<File> res = parse(str); + + assert res.size() == 3; + assert res.contains(FILE_1_1); + assert res.contains(FILE_2_1); + assert res.contains(FILE_2_2); + } + /** + * Ensure provided file exists. + * + * @param file File. + */ + private static void ensureExists(File file) { + assert file.exists(); + } + + /** + * Ensure provided file doesn't exist. + * + * @param file File. + */ + private static void ensureNotExists(File file) { + assert !file.exists(); + } + + /** + * Merge string using path separator. + * + * @param vals Values. + * @return Result. + */ + private static String merge(String... vals) { + StringBuilder res = new StringBuilder(); + + if (vals != null) { + boolean first = true; + + for (String val : vals) { + if (first) + first = false; + else + res.append(File.pathSeparatorChar); + + res.append(val); + } + } + + return res.toString(); + } + + /** + * Parse string. + * + * @param str String. + * @return Files. + * @throws IOException If failed. + */ + Collection<File> parse(String str) throws IOException { + Collection<HadoopClasspathUtils.SearchDirectory> dirs = HadoopClasspathUtils.parseUserLibs(str); + + Collection<File> res = new HashSet<>(); + + for (HadoopClasspathUtils.SearchDirectory dir : dirs) + Collections.addAll(res, dir.files()); + + return res; + } + + /** + * Get absolute path to a single file. + * + * @param file File. + * @return Path. + */ + private static String single(File file) { + return file.getAbsolutePath(); + } + + /** + * Create a wildcard. + * + * @param file File. + * @return Wildcard. + */ + private static String wildcard(File file) { + return file.getAbsolutePath() + File.separatorChar + "*"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java new file mode 100644 index 0000000..ae2c00d --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.InputStream; +import java.util.UUID; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopSerializationWrapper; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; + +/** + * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}. + */ +public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; + + /** + * Custom serialization class that accepts {@link Writable}. + */ + private static class CustomSerialization extends WritableSerialization { + /** {@inheritDoc} */ + @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) { + return new Deserializer<Writable>() { + @Override public void open(InputStream in) { } + + @Override public Writable deserialize(Writable writable) { + return new Text(TEST_SERIALIZED_VALUE); + } + + @Override public void close() { } + }; + } + } + + /** + * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration. + * + * @throws IgniteCheckedException If fails. + */ + public void testCustomSerializationApplying() throws IgniteCheckedException { + JobConf cfg = new JobConf(); + + cfg.setMapOutputKeyClass(IntWritable.class); + cfg.setMapOutputValueClass(Text.class); + cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + HadoopDefaultJobInfo info = createJobInfo(cfg); + + final UUID uuid = UUID.randomUUID(); + + HadoopJobId id = new HadoopJobId(uuid, 1); + + HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null); + + HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, + null)); + + HadoopSerialization ser = taskCtx.keySerialization(); + + assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); + + DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0])); + + assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); + + ser = taskCtx.valueSerialization(); + + assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); + + assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java new file mode 100644 index 0000000..1496150 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Configuration validation tests. + */ +public class HadoopValidationSelfTest extends HadoopAbstractSelfTest { + /** Peer class loading enabled flag. */ + public boolean peerClassLoading; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + + peerClassLoading = false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(peerClassLoading); + + return cfg; + } + + /** + * Ensure that Grid starts when all configuration parameters are valid. + * + * @throws Exception If failed. + */ + public void testValid() throws Exception { + startGrids(1); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java new file mode 100644 index 0000000..4e7cc50 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; +import org.apache.ignite.internal.processors.igfs.IgfsMock; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Tests for weighted map-reduce planned. + */ +public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { + /** ID 1. */ + private static final UUID ID_1 = new UUID(0, 1); + + /** ID 2. */ + private static final UUID ID_2 = new UUID(0, 2); + + /** ID 3. */ + private static final UUID ID_3 = new UUID(0, 3); + + /** MAC 1. */ + private static final String MAC_1 = "mac1"; + + /** MAC 2. */ + private static final String MAC_2 = "mac2"; + + /** MAC 3. */ + private static final String MAC_3 = "mac3"; + + /** Host 1. */ + private static final String HOST_1 = "host1"; + + /** Host 2. */ + private static final String HOST_2 = "host2"; + + /** Host 3. */ + private static final String HOST_3 = "host3"; + + /** Host 4. */ + private static final String HOST_4 = "host4"; + + /** Host 5. */ + private static final String HOST_5 = "host5"; + + /** Standard node 1. */ + private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1); + + /** Standard node 2. */ + private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2); + + /** Standard node 3. */ + private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3); + + /** Standard nodes. */ + private static final Collection<ClusterNode> NODES; + + /** + * Static initializer. + */ + static { + NODES = new ArrayList<>(); + + NODES.add(NODE_1); + NODES.add(NODE_2); + NODES.add(NODE_3); + } + + /** + * Test one IGFS split being assigned to affinity node. + * + * @throws Exception If failed. + */ + public void testOneIgfsSplitAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50)); + + final int expReducers = 4; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + assert plan.mappers() == 1; + assert plan.mapperNodeIds().size() == 1; + assert plan.mapperNodeIds().contains(ID_1); + + checkPlanMappers(plan, splits, NODES, false/*only 1 split*/); + checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/); + } + + /** + * Test one HDFS splits. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 7; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Test HDFS splits with Replication == 3. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsReplication() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 8; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Get all IDs. + * + * @param nodes Nodes. + * @return IDs. + */ + private static Set<UUID> allIds(Collection<ClusterNode> nodes) { + Set<UUID> allIds = new HashSet<>(); + + for (ClusterNode n : nodes) + allIds.add(n.id()); + + return allIds; + } + + /** + * Check mappers for the plan. + * + * @param plan Plan. + * @param splits Splits. + * @param nodes Nodes. + * @param expectUniformity WHether uniformity is expected. + */ + private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits, + Collection<ClusterNode> nodes, boolean expectUniformity) { + // Number of mappers should correspomd to the number of input splits: + assertEquals(splits.size(), plan.mappers()); + + if (expectUniformity) { + // mappers are assigned to all available nodes: + assertEquals(nodes.size(), plan.mapperNodeIds().size()); + + + assertEquals(allIds(nodes), plan.mapperNodeIds()); + } + + // Check all splits are covered by mappers: + Set<HadoopInputSplit> set = new HashSet<>(); + + for (UUID id: plan.mapperNodeIds()) { + Collection<HadoopInputSplit> sp = plan.mappers(id); + + assert sp != null; + + for (HadoopInputSplit s: sp) + assertTrue(set.add(s)); + } + + // must be of the same size & contain same elements: + assertEquals(set, new HashSet<>(splits)); + } + + /** + * Check plan reducers. + * + * @param plan Plan. + * @param nodes Nodes. + * @param expReducers Expected reducers. + * @param expectUniformity Expected uniformity. + */ + private static void checkPlanReducers(HadoopMapReducePlan plan, + Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) { + + assertEquals(expReducers, plan.reducers()); + + if (expectUniformity) + assertEquals(allIds(nodes), plan.reducerNodeIds()); + + int sum = 0; + int lenSum = 0; + + for (UUID uuid: plan.reducerNodeIds()) { + int[] rr = plan.reducers(uuid); + + assert rr != null; + + lenSum += rr.length; + + for (int i: rr) + sum += i; + } + + assertEquals(expReducers, lenSum); + + // Numbers in the arrays must be consequtive integers stating from 0, + // check that simply calculating their total sum: + assertEquals((lenSum * (lenSum - 1) / 2), sum); + } + + /** + * Create planner for IGFS. + * + * @param igfs IGFS. + * @return Planner. + */ + private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) { + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs); + + GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite); + + return planner; + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } + + /** + * Mocked node. + */ + private static class MockNode implements ClusterNode { + /** ID. */ + private final UUID id; + + /** MAC addresses. */ + private final String macs; + + /** Addresses. */ + private final List<String> addrs; + + /** + * Constructor. + * + * @param id Node ID. + * @param macs MAC addresses. + * @param addrs Addresses. + */ + public MockNode(UUID id, String macs, String... addrs) { + assert addrs != null; + + this.id = id; + this.macs = macs; + + this.addrs = Arrays.asList(addrs); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T attribute(String name) { + if (F.eq(name, IgniteNodeAttributes.ATTR_MACS)) + return (T)macs; + + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return addrs; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> attributes() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public long order() { + throwUnsupported(); + + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + throwUnsupported(); + + return false; + } + } + + /** + * Locations builder. + */ + private static class LocationsBuilder { + /** Locations. */ + private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>(); + + /** + * Create new locations builder. + * + * @return Locations builder. + */ + public static LocationsBuilder create() { + return new LocationsBuilder(); + } + + /** + * Add locations. + * + * @param start Start. + * @param nodes Nodes. + * @return This builder for chaining. + */ + public LocationsBuilder add(long start, MockNode... nodes) { + locs.put(start, Arrays.asList(nodes)); + + return this; + } + + /** + * Build locations. + * + * @return Locations. + */ + public TreeMap<Long, Collection<MockNode>> build() { + return locs; + } + + /** + * Build IGFS. + * + * @return IGFS. + */ + public MockIgfs buildIgfs() { + return new MockIgfs(build()); + } + } + + /** + * Mocked IGFS. + */ + private static class MockIgfs extends IgfsMock { + /** Block locations. */ + private final TreeMap<Long, Collection<MockNode>> locs; + + /** + * Constructor. + * + * @param locs Block locations. + */ + public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) { + super("igfs"); + + this.locs = locs; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { + Collection<IgfsBlockLocation> res = new ArrayList<>(); + + long cur = start; + long remaining = len; + + long prevLocStart = -1; + Collection<MockNode> prevLocNodes = null; + + for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) { + long locStart = locEntry.getKey(); + Collection<MockNode> locNodes = locEntry.getValue(); + + if (prevLocNodes != null) { + if (cur < locStart) { + // Add part from previous block. + long prevLen = locStart - prevLocStart; + + res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes)); + + cur = locStart; + remaining -= prevLen; + } + } + + prevLocStart = locStart; + prevLocNodes = locNodes; + + if (remaining == 0) + break; + } + + // Add remainder. + if (remaining != 0) + res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes)); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return false; + } + } + + /** + * Mocked block location. + */ + private static class IgfsBlockLocationMock implements IgfsBlockLocation { + /** Start. */ + private final long start; + + /** Length. */ + private final long len; + + /** Node IDs. */ + private final List<UUID> nodeIds; + + /** + * Constructor. + * + * @param start Start. + * @param len Length. + * @param nodes Nodes. + */ + public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) { + this.start = start; + this.len = len; + + this.nodeIds = new ArrayList<>(nodes.size()); + + for (MockNode node : nodes) + nodeIds.add(node.id); + } + + /** {@inheritDoc} */ + @Override public long start() { + return start; + } + + /** {@inheritDoc} */ + @Override public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public Collection<UUID> nodeIds() { + return nodeIds; + } + + /** {@inheritDoc} */ + @Override public Collection<String> names() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hosts() { + throwUnsupported(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java new file mode 100644 index 0000000..e0403c2 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; + +/** + * Tests whole map-red execution Weighted planner. + */ +public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected HadoopConfiguration createHadoopConfiguration() { + HadoopConfiguration hadoopCfg = new HadoopConfiguration(); + + // Use weighted planner with default settings: + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + hadoopCfg.setMapReducePlanner(planner); + + return hadoopCfg; + } +}