diff --git 
new file mode 100644
index 0000000..f542cf2
--- /dev/null
@@ -0,0 +1,228 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+ * Context for test purpose.
+ */
+class HadoopTestTaskContext extends HadoopV2TaskContext {
+    /**
+     * Simple key-vale pair.
+     * @param <K> Key class.
+     * @param <V> Value class.
+     */
+    public static class Pair<K,V> {
+        /** Key */
+        private K key;
+        /** Value */
+        private V val;
+        /**
+         * @param key key.
+         * @param val value.
+         */
+        Pair(K key, V val) {
+            this.key = key;
+            this.val = val;
+        }
+        /**
+         * Getter of key.
+         * @return key.
+         */
+        K key() {
+            return key;
+        }
+        /**
+         * Getter of value.
+         * @return value.
+         */
+        V value() {
+            return val;
+        }
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return key + "," + val;
+        }
+    }
+    /** Mock output container- result data of task execution if it is not 
overridden. */
+    private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
+    /** Mock input container- input data if it is not overridden. */
+    private Map<Object,List> mockInput = new TreeMap<>();
+    /** Context output implementation to write data into mockOutput. */
+    private HadoopTaskOutput output = new HadoopTaskOutput() {
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) {
+            //Check of casting and extract/copy values
+            String strKey = new String(((Text)key).getBytes());
+            int intVal = ((IntWritable)val).get();
+            mockOutput().add(new Pair<>(strKey, intVal));
+        }
+        /** {@inheritDoc} */
+        @Override public void close() {
+            throw new UnsupportedOperationException();
+        }
+    };
+    /** Context input implementation to read data from mockInput. */
+    private HadoopTaskInput input = new HadoopTaskInput() {
+        /** Iterator of keys and associated lists of values. */
+        Iterator<Map.Entry<Object, List>> iter;
+        /** Current key and associated value list. */
+        Map.Entry<Object, List> currEntry;
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (iter == null)
+                iter = mockInput().entrySet().iterator();
+            if (iter.hasNext())
+                currEntry =;
+            else
+                currEntry = null;
+            return currEntry != null;
+        }
+        /** {@inheritDoc} */
+        @Override public Object key() {
+            return currEntry.getKey();
+        }
+        /** {@inheritDoc} */
+        @Override public Iterator<?> values() {
+            return currEntry.getValue().iterator() ;
+        }
+        /** {@inheritDoc} */
+        @Override public void close() {
+            throw new UnsupportedOperationException();
+        }
+    };
+    /**
+     * Getter of mock output container - result of task if it is not 
+     *
+     * @return mock output.
+     */
+    public List<Pair<String, Integer>> mockOutput() {
+        return mockOutput;
+    }
+    /**
+     * Getter of mock input container- input data if it is not overridden.
+     *
+     * @return mock output.
+     */
+    public Map<Object, List> mockInput() {
+        return mockInput;
+    }
+    /**
+     * Generate one-key-multiple-values tree from array of key-value pairs, 
and wrap its into Writable objects.
+     * The result is placed into mock input.
+     *
+     * @param flatData list of key-value pair.
+     */
+    public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
+        Text key = new Text();
+        for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) {
+            key.set(pair.key);
+            ArrayList<IntWritable> valList;
+            if (!mockInput.containsKey(key)) {
+                valList = new ArrayList<>();
+                mockInput.put(key, valList);
+                key = new Text();
+            }
+            else
+                valList = (ArrayList<IntWritable>) mockInput.get(key);
+            valList.add(new IntWritable(pair.value()));
+        }
+    }
+    /**
+     * @param taskInfo Task info.
+     * @param gridJob Grid Hadoop job.
+     */
+    public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) 
throws IgniteCheckedException {
+        super(taskInfo, gridJob,, null, 
+    }
+    /**
+     * Creates DataInput to read JobConf.
+     *
+     * @param job Job.
+     * @return DataInput with JobConf.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static DataInput jobConfDataInput(HadoopJob job) throws 
IgniteCheckedException {
+        JobConf jobConf = new JobConf();
+        for (Map.Entry<String, String> e : 
+            jobConf.set(e.getKey(), e.getValue());
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+        try {
+            jobConf.write(new DataOutputStream(buf));
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        return new DataInputStream(new 
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopTaskOutput output() {
+        return output;
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopTaskInput input() {
+        return input;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..da0d922
--- /dev/null
@@ -0,0 +1,178 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+ * Utility class for tests.
+ */
+public class HadoopTestUtils {
+    /** Base test directory. */
+    private static final File BASE_TEST_DIR = new File(U.getIgniteHome() + 
+    /**
+     * @return Base directory for tests.
+     */
+    public static File baseTestDir() {
+        return BASE_TEST_DIR;
+    }
+    /**
+     * Get test directory.
+     *
+     * @param parts Parts.
+     * @return Directory.
+     */
+    public static File testDir(String... parts) {
+        File res = BASE_TEST_DIR;
+        if (parts != null) {
+            for (String part : parts)
+                res = new File(res, part);
+        }
+        return res;
+    }
+    /**
+     * Clear base test directory.
+     */
+    public static void clearBaseTestDir() {
+        if (baseTestDir().exists())
+            assert delete(baseTestDir());
+    }
+    /**
+     * Checks that job statistics file contains valid strings only.
+     *
+     * @param reader Buffered reader to get lines of job statistics.
+     * @return Amount of events.
+     * @throws IOException If failed.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static long simpleCheckJobStatFile(BufferedReader reader) throws 
IOException {
+        Collection<String> phases = new HashSet<>();
+        phases.add("submit");
+        phases.add("prepare");
+        phases.add("start");
+        phases.add("finish");
+        phases.add("requestId");
+        phases.add("responseId");
+        Collection<String> evtTypes = new HashSet<>();
+        evtTypes.add("JOB");
+        evtTypes.add("SETUP");
+        evtTypes.add("MAP");
+        evtTypes.add("SHUFFLE");
+        evtTypes.add("REDUCE");
+        evtTypes.add("COMBINE");
+        evtTypes.add("COMMIT");
+        long evtCnt = 0;
+        String line;
+        Map<Long, String> reduceNodes = new HashMap<>();
+        while((line = reader.readLine()) != null) {
+            String[] splitLine = line.split(":");
+            //Try parse timestamp
+            Long.parseLong(splitLine[1]);
+            String[] evt = splitLine[0].split(" ");
+            assertTrue("Unknown event '" + evt[0] + "'", 
+            String phase;
+            if ("JOB".equals(evt[0]))
+                phase = evt[1];
+            else {
+                assertEquals(4, evt.length);
+                assertTrue("The node id is not defined", !F.isEmpty(evt[3]));
+                long taskNum = Long.parseLong(evt[1]);
+                if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) {
+                    String nodeId = reduceNodes.get(taskNum);
+                    if (nodeId == null)
+                        reduceNodes.put(taskNum, evt[3]);
+                    else
+                        assertEquals("Different nodes for SHUFFLE and REDUCE 
tasks", nodeId, evt[3]);
+                }
+                phase = evt[2];
+            }
+            assertTrue("Unknown phase '" + phase + "' in " + 
Arrays.toString(evt), phases.contains(phase));
+            evtCnt++;
+        }
+        return evtCnt;
+    }
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param file File or directory to delete.
+     * @return {@code true} if and only if the file or directory is 
successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(@Nullable File file) {
+        if (file == null)
+            return false;
+        boolean res = true;
+        if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            if (files != null && files.length > 0)
+                for (File file1 : files)
+                    if (file1.isDirectory())
+                        res &= delete(file1);
+                    else
+                        res &= file1.delete();
+            res &= file.delete();
+        }
+        else
+            res = file.delete();
+        return res;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..9e3c8f4
--- /dev/null
@@ -0,0 +1,260 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+ * Tests for user libs parsing.
+ */
+public class HadoopUserLibsSelfTest extends GridCommonAbstractTest {
+    /** Directory 1. */
+    private static final File DIR_1 = HadoopTestUtils.testDir("dir1");
+    /** File 1 in directory 1. */
+    private static final File FILE_1_1 = new File(DIR_1, "file1.jar");
+    /** File 2 in directory 1. */
+    private static final File FILE_1_2 = new File(DIR_1, "file2.jar");
+    /** Directory 2. */
+    private static final File DIR_2 = HadoopTestUtils.testDir("dir2");
+    /** File 1 in directory 2. */
+    private static final File FILE_2_1 = new File(DIR_2, "file1.jar");
+    /** File 2 in directory 2. */
+    private static final File FILE_2_2 = new File(DIR_2, "file2.jar");
+    /** Missing directory. */
+    private static final File MISSING_DIR = 
+    /** Missing file. */
+    private static final File MISSING_FILE = new File(MISSING_DIR, "file.jar");
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        HadoopTestUtils.clearBaseTestDir();
+        assert DIR_1.mkdirs();
+        assert DIR_2.mkdirs();
+        assert FILE_1_1.createNewFile();
+        assert FILE_1_2.createNewFile();
+        assert FILE_2_1.createNewFile();
+        assert FILE_2_2.createNewFile();
+    }
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        // Sanity checks before test start.
+        ensureExists(FILE_1_1);
+        ensureExists(FILE_1_2);
+        ensureExists(FILE_2_1);
+        ensureExists(FILE_2_2);
+        ensureNotExists(MISSING_DIR);
+        ensureNotExists(MISSING_FILE);
+    }
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        HadoopTestUtils.clearBaseTestDir();
+    }
+    /**
+     * Test null or empty user libs.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNullOrEmptyUserLibs() throws Exception {
+        assert parse(null).isEmpty();
+        assert parse("").isEmpty();
+    }
+    /**
+     * Test single file.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingle() throws Exception {
+        Collection<File> res = parse(single(FILE_1_1));
+        assert res.size() == 1;
+        assert res.contains(FILE_1_1);
+        res = parse(single(MISSING_FILE));
+        assert res.size() == 0;
+    }
+    /**
+     * Test multiple files.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMultiple() throws Exception {
+        Collection<File> res =
+            parse(merge(single(FILE_1_1), single(FILE_1_2), single(FILE_2_1), 
single(FILE_2_2), single(MISSING_FILE)));
+        assert res.size() == 4;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_1_2);
+        assert res.contains(FILE_2_1);
+        assert res.contains(FILE_2_2);
+    }
+    /**
+     * Test single wildcard.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleWildcard() throws Exception {
+        Collection<File> res = parse(wildcard(DIR_1));
+        assert res.size() == 2;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_1_2);
+        res = parse(wildcard(MISSING_DIR));
+        assert res.size() == 0;
+    }
+    /**
+     * Test multiple wildcards.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMultipleWildcards() throws Exception {
+        Collection<File> res = parse(merge(wildcard(DIR_1), wildcard(DIR_2), 
+        assert res.size() == 4;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_1_2);
+        assert res.contains(FILE_2_1);
+        assert res.contains(FILE_2_2);
+    }
+    /**
+     * Test mixed tokens.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMixed() throws Exception {
+        String str = merge(
+            single(FILE_1_1),
+            wildcard(DIR_2),
+            single(MISSING_FILE),
+            wildcard(MISSING_DIR)
+        );
+        Collection<File> res = parse(str);
+        assert res.size() == 3;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_2_1);
+        assert res.contains(FILE_2_2);
+    }
+    /**
+     * Ensure provided file exists.
+     *
+     * @param file File.
+     */
+    private static void ensureExists(File file) {
+        assert file.exists();
+    }
+    /**
+     * Ensure provided file doesn't exist.
+     *
+     * @param file File.
+     */
+    private static void ensureNotExists(File file) {
+        assert !file.exists();
+    }
+    /**
+     * Merge string using path separator.
+     *
+     * @param vals Values.
+     * @return Result.
+     */
+    private static String merge(String... vals) {
+        StringBuilder res = new StringBuilder();
+        if (vals != null) {
+            boolean first = true;
+            for (String val : vals) {
+                if (first)
+                    first = false;
+                else
+                    res.append(File.pathSeparatorChar);
+                res.append(val);
+            }
+        }
+        return res.toString();
+    }
+    /**
+     * Parse string.
+     *
+     * @param str String.
+     * @return Files.
+     * @throws IOException If failed.
+     */
+    Collection<File> parse(String str) throws IOException {
+        Collection<HadoopClasspathUtils.SearchDirectory> dirs = 
+        Collection<File> res = new HashSet<>();
+        for (HadoopClasspathUtils.SearchDirectory dir : dirs)
+            Collections.addAll(res, dir.files());
+        return res;
+    }
+    /**
+     * Get absolute path to a single file.
+     *
+     * @param file File.
+     * @return Path.
+     */
+    private static String single(File file) {
+        return file.getAbsolutePath();
+    }
+    /**
+     * Create a wildcard.
+     *
+     * @param file File.
+     * @return Wildcard.
+     */
+    private static String wildcard(File file) {
+        return file.getAbsolutePath() + File.separatorChar + "*";
+    }
diff --git 
new file mode 100644
index 0000000..ae2c00d
--- /dev/null
@@ -0,0 +1,100 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.util.UUID;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+import static 
+ * Self test of {@link 
+ */
+public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String TEST_SERIALIZED_VALUE = "Test serialized 
+    /**
+     * Custom serialization class that accepts {@link Writable}.
+     */
+    private static class CustomSerialization extends WritableSerialization {
+        /** {@inheritDoc} */
+        @Override public Deserializer<Writable> 
getDeserializer(Class<Writable> c) {
+            return new Deserializer<Writable>() {
+                @Override public void open(InputStream in) { }
+                @Override public Writable deserialize(Writable writable) {
+                    return new Text(TEST_SERIALIZED_VALUE);
+                }
+                @Override public void close() { }
+            };
+        }
+    }
+    /**
+     * Tests that {@link HadoopJob} provides wrapped serializer if it's set in 
+     *
+     * @throws IgniteCheckedException If fails.
+     */
+    public void testCustomSerializationApplying() throws 
IgniteCheckedException {
+        JobConf cfg = new JobConf();
+        cfg.setMapOutputKeyClass(IntWritable.class);
+        cfg.setMapOutputValueClass(Text.class);
+        cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, 
+        HadoopDefaultJobInfo info = createJobInfo(cfg);
+        final UUID uuid = UUID.randomUUID();
+        HadoopJobId id = new HadoopJobId(uuid, 1);
+        HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null);
+        HadoopTaskContext taskCtx = job.getTaskContext(new 
HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
+            null));
+        HadoopSerialization ser = taskCtx.keySerialization();
+        assertEquals(HadoopSerializationWrapper.class.getName(), 
+        DataInput in = new DataInputStream(new ByteArrayInputStream(new 
+        assertEquals(TEST_SERIALIZED_VALUE,, null).toString());
+        ser = taskCtx.valueSerialization();
+        assertEquals(HadoopSerializationWrapper.class.getName(), 
+        assertEquals(TEST_SERIALIZED_VALUE,, null).toString());
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..1496150
--- /dev/null
@@ -0,0 +1,53 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.configuration.IgniteConfiguration;
+ * Configuration validation tests.
+ */
+public class HadoopValidationSelfTest extends HadoopAbstractSelfTest {
+    /** Peer class loading enabled flag. */
+    public boolean peerClassLoading;
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+        peerClassLoading = false;
+    }
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+        cfg.setPeerClassLoadingEnabled(peerClassLoading);
+        return cfg;
+    }
+    /**
+     * Ensure that Grid starts when all configuration parameters are valid.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValid() throws Exception {
+        startGrids(1);
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..4e7cc50
--- /dev/null
@@ -0,0 +1,599 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+ * Tests for weighted map-reduce planned.
+ */
+public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest 
+    /** ID 1. */
+    private static final UUID ID_1 = new UUID(0, 1);
+    /** ID 2. */
+    private static final UUID ID_2 = new UUID(0, 2);
+    /** ID 3. */
+    private static final UUID ID_3 = new UUID(0, 3);
+    /** MAC 1. */
+    private static final String MAC_1 = "mac1";
+    /** MAC 2. */
+    private static final String MAC_2 = "mac2";
+    /** MAC 3. */
+    private static final String MAC_3 = "mac3";
+    /** Host 1. */
+    private static final String HOST_1 = "host1";
+    /** Host 2. */
+    private static final String HOST_2 = "host2";
+    /** Host 3. */
+    private static final String HOST_3 = "host3";
+    /** Host 4. */
+    private static final String HOST_4 = "host4";
+    /** Host 5. */
+    private static final String HOST_5 = "host5";
+    /** Standard node 1. */
+    private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1);
+    /** Standard node 2. */
+    private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2);
+    /** Standard node 3. */
+    private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3);
+    /** Standard nodes. */
+    private static final Collection<ClusterNode> NODES;
+    /**
+     * Static initializer.
+     */
+    static {
+        NODES = new ArrayList<>();
+        NODES.add(NODE_1);
+        NODES.add(NODE_2);
+        NODES.add(NODE_3);
+    }
+    /**
+     * Test one IGFS split being assigned to affinity node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testOneIgfsSplitAffinity() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, 
NODE_2).add(100, NODE_3).buildIgfs();
+        List<HadoopInputSplit> splits = new ArrayList<>();
+        splits.add(new HadoopFileBlock(new String[] { HOST_1 }, 
URI.create("igfs://igfs@/file"), 0, 50));
+        final int expReducers = 4;
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+        HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+        assert plan.mappers() == 1;
+        assert plan.mapperNodeIds().size() == 1;
+        assert plan.mapperNodeIds().contains(ID_1);
+        checkPlanMappers(plan, splits, NODES, false/*only 1 split*/);
+        checkPlanReducers(plan, NODES, expReducers, false/* because of 
threshold behavior.*/);
+    }
+    /**
+     * Test one HDFS splits.
+     *
+     * @throws Exception If failed.
+     */
+    public void testHdfsSplitsAffinity() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, 
NODE_2).add(100, NODE_3).buildIgfs();
+        final List<HadoopInputSplit> splits = new ArrayList<>();
+        splits.add(new HadoopFileBlock(new String[] { HOST_1 }, 
URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+        splits.add(new HadoopFileBlock(new String[] { HOST_2 }, 
URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+        splits.add(new HadoopFileBlock(new String[] { HOST_3 }, 
URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+        // The following splits belong to hosts that are out of Ignite 
topology at all.
+        // This means that these splits should be assigned to any least loaded 
+        splits.add(new HadoopFileBlock(new String[] { HOST_4 }, 
URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+        splits.add(new HadoopFileBlock(new String[] { HOST_5 }, 
URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+        final int expReducers = 7;
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+        final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+        checkPlanMappers(plan, splits, NODES, true);
+        checkPlanReducers(plan, NODES, expReducers, true);
+    }
+    /**
+     * Test HDFS splits with Replication == 3.
+     *
+     * @throws Exception If failed.
+     */
+    public void testHdfsSplitsReplication() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, 
NODE_2).add(100, NODE_3).buildIgfs();
+        final List<HadoopInputSplit> splits = new ArrayList<>();
+        splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 
}, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+        splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 
}, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+        splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 
}, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+        // The following splits belong to hosts that are out of Ignite 
topology at all.
+        // This means that these splits should be assigned to any least loaded 
+        splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 
}, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+        splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 
}, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+        final int expReducers = 8;
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+        final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+        checkPlanMappers(plan, splits, NODES, true);
+        checkPlanReducers(plan, NODES, expReducers, true);
+    }
+    /**
+     * Get all IDs.
+     *
+     * @param nodes Nodes.
+     * @return IDs.
+     */
+    private static Set<UUID> allIds(Collection<ClusterNode> nodes) {
+        Set<UUID> allIds = new HashSet<>();
+        for (ClusterNode n : nodes)
+            allIds.add(;
+        return allIds;
+    }
+    /**
+     * Check mappers for the plan.
+     *
+     * @param plan Plan.
+     * @param splits Splits.
+     * @param nodes Nodes.
+     * @param expectUniformity WHether uniformity is expected.
+     */
+    private static void checkPlanMappers(HadoopMapReducePlan plan, 
List<HadoopInputSplit> splits,
+        Collection<ClusterNode> nodes, boolean expectUniformity) {
+        // Number of mappers should correspomd to the number of input splits:
+        assertEquals(splits.size(), plan.mappers());
+        if (expectUniformity) {
+            // mappers are assigned to all available nodes:
+            assertEquals(nodes.size(), plan.mapperNodeIds().size());
+            assertEquals(allIds(nodes), plan.mapperNodeIds());
+        }
+        // Check all splits are covered by mappers:
+        Set<HadoopInputSplit> set = new HashSet<>();
+        for (UUID id: plan.mapperNodeIds()) {
+            Collection<HadoopInputSplit> sp = plan.mappers(id);
+            assert sp != null;
+            for (HadoopInputSplit s: sp)
+                assertTrue(set.add(s));
+        }
+        // must be of the same size & contain same elements:
+        assertEquals(set, new HashSet<>(splits));
+    }
+    /**
+     * Check plan reducers.
+     *
+     * @param plan Plan.
+     * @param nodes Nodes.
+     * @param expReducers Expected reducers.
+     * @param expectUniformity Expected uniformity.
+     */
+    private static void checkPlanReducers(HadoopMapReducePlan plan,
+        Collection<ClusterNode> nodes, int expReducers, boolean 
expectUniformity) {
+        assertEquals(expReducers, plan.reducers());
+        if (expectUniformity)
+            assertEquals(allIds(nodes), plan.reducerNodeIds());
+        int sum = 0;
+        int lenSum = 0;
+        for (UUID uuid: plan.reducerNodeIds()) {
+            int[] rr = plan.reducers(uuid);
+            assert rr != null;
+            lenSum += rr.length;
+            for (int i: rr)
+                sum += i;
+        }
+        assertEquals(expReducers, lenSum);
+        // Numbers in the arrays must be consequtive integers stating from 0,
+        // check that simply calculating their total sum:
+        assertEquals((lenSum * (lenSum - 1) / 2), sum);
+    }
+    /**
+     * Create planner for IGFS.
+     *
+     * @param igfs IGFS.
+     * @return Planner.
+     */
+    private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock 
igfs) {
+        IgniteHadoopWeightedMapReducePlanner planner = new 
+        IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs);
+        GridTestUtils.setFieldValue(planner, 
HadoopAbstractMapReducePlanner.class, "ignite", ignite);
+        return planner;
+    }
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+    /**
+     * Mocked node.
+     */
+    private static class MockNode implements ClusterNode {
+        /** ID. */
+        private final UUID id;
+        /** MAC addresses. */
+        private final String macs;
+        /** Addresses. */
+        private final List<String> addrs;
+        /**
+         * Constructor.
+         *
+         * @param id Node ID.
+         * @param macs MAC addresses.
+         * @param addrs Addresses.
+         */
+        public MockNode(UUID id, String macs, String... addrs) {
+            assert addrs != null;
+   = id;
+            this.macs = macs;
+            this.addrs = Arrays.asList(addrs);
+        }
+        /** {@inheritDoc} */
+        @Override public UUID id() {
+            return id;
+        }
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Nullable @Override public <T> T attribute(String name) {
+            if (F.eq(name, IgniteNodeAttributes.ATTR_MACS))
+                return (T)macs;
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public Collection<String> addresses() {
+            return addrs;
+        }
+        /** {@inheritDoc} */
+        @Override public Object consistentId() {
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public ClusterMetrics metrics() {
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public Map<String, Object> attributes() {
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public Collection<String> hostNames() {
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public long order() {
+            throwUnsupported();
+            return 0;
+        }
+        /** {@inheritDoc} */
+        @Override public IgniteProductVersion version() {
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean isLocal() {
+            throwUnsupported();
+            return false;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean isDaemon() {
+            throwUnsupported();
+            return false;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean isClient() {
+            throwUnsupported();
+            return false;
+        }
+    }
+    /**
+     * Locations builder.
+     */
+    private static class LocationsBuilder {
+        /** Locations. */
+        private final TreeMap<Long, Collection<MockNode>> locs = new 
+        /**
+         * Create new locations builder.
+         *
+         * @return Locations builder.
+         */
+        public static LocationsBuilder create() {
+            return new LocationsBuilder();
+        }
+        /**
+         * Add locations.
+         *
+         * @param start Start.
+         * @param nodes Nodes.
+         * @return This builder for chaining.
+         */
+        public LocationsBuilder add(long start, MockNode... nodes) {
+            locs.put(start, Arrays.asList(nodes));
+            return this;
+        }
+        /**
+         * Build locations.
+         *
+         * @return Locations.
+         */
+        public TreeMap<Long, Collection<MockNode>> build() {
+            return locs;
+        }
+        /**
+         * Build IGFS.
+         *
+         * @return IGFS.
+         */
+        public MockIgfs buildIgfs() {
+            return new MockIgfs(build());
+        }
+    }
+    /**
+     * Mocked IGFS.
+     */
+    private static class MockIgfs extends IgfsMock {
+        /** Block locations. */
+        private final TreeMap<Long, Collection<MockNode>> locs;
+        /**
+         * Constructor.
+         *
+         * @param locs Block locations.
+         */
+        public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) {
+            super("igfs");
+            this.locs = locs;
+        }
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, 
long start, long len) {
+            Collection<IgfsBlockLocation> res = new ArrayList<>();
+            long cur = start;
+            long remaining = len;
+            long prevLocStart = -1;
+            Collection<MockNode> prevLocNodes = null;
+            for (Map.Entry<Long, Collection<MockNode>> locEntry : 
locs.entrySet()) {
+                long locStart = locEntry.getKey();
+                Collection<MockNode> locNodes = locEntry.getValue();
+                if (prevLocNodes != null) {
+                    if (cur < locStart) {
+                        // Add part from previous block.
+                        long prevLen = locStart - prevLocStart;
+                        res.add(new IgfsBlockLocationMock(cur, prevLen, 
+                        cur = locStart;
+                        remaining -= prevLen;
+                    }
+                }
+                prevLocStart = locStart;
+                prevLocNodes = locNodes;
+                if (remaining == 0)
+                    break;
+            }
+            // Add remainder.
+            if (remaining != 0)
+                res.add(new IgfsBlockLocationMock(cur, remaining, 
+            return res;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean exists(IgfsPath path) {
+            return true;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean isProxy(URI path) {
+            return false;
+        }
+    }
+    /**
+     * Mocked block location.
+     */
+    private static class IgfsBlockLocationMock implements IgfsBlockLocation {
+        /** Start. */
+        private final long start;
+        /** Length. */
+        private final long len;
+        /** Node IDs. */
+        private final List<UUID> nodeIds;
+        /**
+         * Constructor.
+         *
+         * @param start Start.
+         * @param len Length.
+         * @param nodes Nodes.
+         */
+        public IgfsBlockLocationMock(long start, long len, 
Collection<MockNode> nodes) {
+            this.start = start;
+            this.len = len;
+            this.nodeIds = new ArrayList<>(nodes.size());
+            for (MockNode node : nodes)
+                nodeIds.add(;
+        }
+        /** {@inheritDoc} */
+        @Override public long start() {
+            return start;
+        }
+        /** {@inheritDoc} */
+        @Override public long length() {
+            return len;
+        }
+        /** {@inheritDoc} */
+        @Override public Collection<UUID> nodeIds() {
+            return nodeIds;
+        }
+        /** {@inheritDoc} */
+        @Override public Collection<String> names() {
+            throwUnsupported();
+            return null;
+        }
+        /** {@inheritDoc} */
+        @Override public Collection<String> hosts() {
+            throwUnsupported();
+            return null;
+        }
+    }
diff --git 
new file mode 100644
index 0000000..e0403c2
--- /dev/null
@@ -0,0 +1,38 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+ * Tests whole map-red execution Weighted planner.
+ */
+public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected HadoopConfiguration createHadoopConfiguration() {
+        HadoopConfiguration hadoopCfg = new HadoopConfiguration();
+        // Use weighted planner with default settings:
+        IgniteHadoopWeightedMapReducePlanner planner = new 
+        hadoopCfg.setMapReducePlanner(planner);
+        return hadoopCfg;
+    }

Reply via email to