http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
new file mode 100644
index 0000000..a69b72a
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HadoopDefaultMapReducePlannerSelfTest extends 
HadoopAbstractSelfTest {
+    /** */
+    private static final UUID ID_1 = new UUID(0, 1);
+
+    /** */
+    private static final UUID ID_2 = new UUID(0, 2);
+
+    /** */
+    private static final UUID ID_3 = new UUID(0, 3);
+
+    /** */
+    private static final String HOST_1 = "host1";
+
+    /** */
+    private static final String HOST_2 = "host2";
+
+    /** */
+    private static final String HOST_3 = "host3";
+
+    /** */
+    private static final String INVALID_HOST_1 = "invalid_host1";
+
+    /** */
+    private static final String INVALID_HOST_2 = "invalid_host2";
+
+    /** */
+    private static final String INVALID_HOST_3 = "invalid_host3";
+
+    /** Mocked IGFS. */
+    private static final IgniteFileSystem IGFS = new MockIgfs();
+
+    /** Mocked Grid. */
+    private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS);
+
+    /** Planner. */
+    private static final HadoopMapReducePlanner PLANNER = new 
IgniteHadoopMapReducePlanner();
+
+    /** Block locations. */
+    private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = 
new HashMap<>();
+
+    /** Proxy map. */
+    private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>();
+
+    /** Last created plan. */
+    private static final ThreadLocal<HadoopMapReducePlan> PLAN = new 
ThreadLocal<>();
+
+    /**
+     * Static initializer.
+     */
+    static {
+        GridTestUtils.setFieldValue(PLANNER, 
HadoopAbstractMapReducePlanner.class, "ignite", GRID);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        GridTestUtils.setFieldValue(PLANNER, 
HadoopAbstractMapReducePlanner.class, "log", log());
+
+        BLOCK_MAP.clear();
+        PROXY_MAP.clear();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testIgfsOneBlockPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
+        HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
+
+        mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1));
+        mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2));
+        mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3));
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 2);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || 
ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 2);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || 
ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3);
+
+        mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2));
+        mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2));
+        mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3));
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && 
ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && 
ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testNonIgfsSeveralBlocksPerNode() throws 
IgniteCheckedException {
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, 
HOST_2);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, 
HOST_2);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, 
HOST_3);
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && 
ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && 
ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testIgfsSeveralComplexBlocksPerNode() throws 
IgniteCheckedException {
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, 
HOST_3);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, 
HOST_3);
+
+        mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), 
location(51, 100, ID_1, ID_3));
+        mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), 
location(51, 100, ID_2, ID_3));
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split2);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_1);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || 
ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testNonIgfsOrphans() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, 
INVALID_HOST_1, INVALID_HOST_2);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, 
INVALID_HOST_1, INVALID_HOST_3);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, 
INVALID_HOST_2, INVALID_HOST_3);
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && 
ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 1) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, 
split1) && ensureReducers(ID_3, 1);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && 
ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 2) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, 
split1) && ensureReducers(ID_3, 2);
+
+        plan(1, split1, split2, split3);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split1) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && 
ensureReducers(ID_3, 0) ||
+            ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 0) ||
+            ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && 
ensureReducers(ID_3, 1);
+
+        plan(3, split1, split2, split3);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split1) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split1) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * Create plan.
+     *
+     * @param reducers Reducers count.
+     * @param splits Splits.
+     * @return Plan.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... 
splits) throws IgniteCheckedException {
+        assert reducers > 0;
+        assert splits != null && splits.length > 0;
+
+        Collection<HadoopInputSplit> splitList = new 
ArrayList<>(splits.length);
+
+        Collections.addAll(splitList, splits);
+
+        Collection<ClusterNode> top = new ArrayList<>();
+
+        GridTestNode node1 = new GridTestNode(ID_1);
+        GridTestNode node2 = new GridTestNode(ID_2);
+        GridTestNode node3 = new GridTestNode(ID_3);
+
+        node1.setHostName(HOST_1);
+        node2.setHostName(HOST_2);
+        node3.setHostName(HOST_3);
+
+        top.add(node1);
+        top.add(node2);
+        top.add(node3);
+
+        HadoopMapReducePlan plan = PLANNER.preparePlan(new 
HadoopPlannerMockJob(splitList, reducers), top, null);
+
+        PLAN.set(plan);
+
+        return plan;
+    }
+
+    /**
+     * Ensure that node contains the given mappers.
+     *
+     * @param nodeId Node ID.
+     * @param expSplits Expected splits.
+     * @return {@code True} if this assumption is valid.
+     */
+    private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... 
expSplits) {
+        Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>();
+
+        Collections.addAll(expSplitsCol, expSplits);
+
+        Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId);
+
+        return F.eq(expSplitsCol, splits);
+    }
+
+    /**
+     * Ensure that node contains the given amount of reducers.
+     *
+     * @param nodeId Node ID.
+     * @param reducers Reducers.
+     * @return {@code True} if this assumption is valid.
+     */
+    private static boolean ensureReducers(UUID nodeId, int reducers) {
+        int[] reducersArr = PLAN.get().reducers(nodeId);
+
+        return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null 
&& reducersArr.length == reducers);
+    }
+
+    /**
+     * Ensure that no mappers and reducers is located on this node.
+     *
+     * @param nodeId Node ID.
+     * @return {@code True} if this assumption is valid.
+     */
+    private static boolean ensureEmpty(UUID nodeId) {
+        return F.isEmpty(PLAN.get().mappers(nodeId)) && 
F.isEmpty(PLAN.get().reducers(nodeId));
+    }
+
+    /**
+     * Create split.
+     *
+     * @param igfs IGFS flag.
+     * @param file File.
+     * @param start Start.
+     * @param len Length.
+     * @param hosts Hosts.
+     * @return Split.
+     */
+    private static HadoopFileBlock split(boolean igfs, String file, long 
start, long len, String... hosts) {
+        URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file);
+
+        return new HadoopFileBlock(hosts, uri, start, len);
+    }
+
+    /**
+     * Create block location.
+     *
+     * @param start Start.
+     * @param len Length.
+     * @param nodeIds Node IDs.
+     * @return Block location.
+     */
+    private static IgfsBlockLocation location(long start, long len, UUID... 
nodeIds) {
+        assert nodeIds != null && nodeIds.length > 0;
+
+        Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length);
+
+        for (UUID id : nodeIds)
+            nodes.add(new GridTestNode(id));
+
+        return new IgfsBlockLocationImpl(start, len, nodes);
+    }
+
+    /**
+     * Map IGFS block to nodes.
+     *
+     * @param file File.
+     * @param start Start.
+     * @param len Length.
+     * @param locations Locations.
+     */
+    private static void mapIgfsBlock(URI file, long start, long len, 
IgfsBlockLocation... locations) {
+        assert locations != null && locations.length > 0;
+
+        IgfsPath path = new IgfsPath(file);
+
+        Block block = new Block(path, start, len);
+
+        Collection<IgfsBlockLocation> locationsList = new ArrayList<>();
+
+        Collections.addAll(locationsList, locations);
+
+        BLOCK_MAP.put(block, locationsList);
+    }
+
+    /**
+     * Block.
+     */
+    private static class Block {
+        /** */
+        private final IgfsPath path;
+
+        /** */
+        private final long start;
+
+        /** */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param path Path.
+         * @param start Start.
+         * @param len Length.
+         */
+        private Block(IgfsPath path, long start, long len) {
+            this.path = path;
+            this.start = start;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("RedundantIfStatement")
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Block)) return false;
+
+            Block block = (Block) o;
+
+            if (len != block.len)
+                return false;
+
+            if (start != block.start)
+                return false;
+
+            if (!path.equals(block.path))
+                return false;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = path.hashCode();
+
+            res = 31 * res + (int) (start ^ (start >>> 32));
+            res = 31 * res + (int) (len ^ (len >>> 32));
+
+            return res;
+        }
+    }
+
+    /**
+     * Mocked IGFS.
+     */
+    private static class MockIgfs extends IgfsMock {
+        /**
+         * Constructor.
+         */
+        public MockIgfs() {
+            super("igfs");
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isProxy(URI path) {
+            return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, 
long start, long len) {
+            return BLOCK_MAP.get(new Block(path, start, len));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean exists(IgfsPath path) {
+            return true;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
new file mode 100644
index 0000000..843b42b
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Error simulator.
+ */
+public class HadoopErrorSimulator {
+    /** No-op singleton instance. */
+    public static final HadoopErrorSimulator noopInstance = new 
HadoopErrorSimulator();
+
+    /** Instance ref. */
+    private static final AtomicReference<HadoopErrorSimulator> ref = new 
AtomicReference<>(noopInstance);
+
+    /**
+     * Creates simulator of given kind with given stage bits.
+     *
+     * @param kind The kind.
+     * @param bits The stage bits.
+     * @return The simulator.
+     */
+    public static HadoopErrorSimulator create(Kind kind, int bits) {
+        switch (kind) {
+            case Noop:
+                return noopInstance;
+            case Runtime:
+                return new RuntimeExceptionBitHadoopErrorSimulator(bits);
+            case IOException:
+                return new IOExceptionBitHadoopErrorSimulator(bits);
+            case Error:
+                return new ErrorBitHadoopErrorSimulator(bits);
+            default:
+                throw new IllegalStateException("Unknown kind: " + kind);
+        }
+    }
+
+    /**
+     * Gets the error simulator instance.
+     */
+    public static HadoopErrorSimulator instance() {
+        return ref.get();
+    }
+
+    /**
+     * Sets instance.
+     */
+    public static boolean setInstance(HadoopErrorSimulator expect, 
HadoopErrorSimulator update) {
+        return ref.compareAndSet(expect, update);
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopErrorSimulator() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapSetup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMap() throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapCleanup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapClose()  throws IOException {
+        // no-op
+    }
+
+    /**
+     * setConf() does not declare IOException to be thrown.
+     */
+    public void onCombineConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombineSetup() throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombine() throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombineCleanup() throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceSetup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduce()  throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceCleanup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Error kind.
+     */
+    public enum Kind {
+        /** No error. */
+        Noop,
+
+        /** Runtime. */
+        Runtime,
+
+        /** IOException. */
+        IOException,
+
+        /** java.lang.Error. */
+        Error
+    }
+
+    /**
+     * Runtime error simulator.
+     */
+    public static class RuntimeExceptionBitHadoopErrorSimulator extends 
HadoopErrorSimulator {
+        /** Stage bits: defines what map-reduce stages will cause errors. */
+        private final int bits;
+
+        /**
+         * Constructor.
+         */
+        protected RuntimeExceptionBitHadoopErrorSimulator(int b) {
+            bits = b;
+        }
+
+        /**
+         * Simulates an error.
+         */
+        protected void simulateError() throws IOException {
+            throw new RuntimeException("An error simulated by " + 
getClass().getSimpleName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapConfigure() {
+            try {
+                if ((bits & 1) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapSetup() throws IOException, 
InterruptedException {
+            if ((bits & 2) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMap() throws IOException {
+            if ((bits & 4) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapCleanup() throws IOException, 
InterruptedException {
+            if ((bits & 8) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineConfigure() {
+            try {
+                if ((bits & 16) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineSetup() throws IOException, 
InterruptedException {
+            if ((bits & 32) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombine() throws IOException {
+            if ((bits & 64) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineCleanup() throws IOException, 
InterruptedException {
+            if ((bits & 128) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceConfigure() {
+            try {
+                if ((bits & 256) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceSetup() throws IOException, 
InterruptedException {
+            if ((bits & 512) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduce() throws IOException {
+            if ((bits & 1024) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceCleanup() throws IOException, 
InterruptedException {
+            if ((bits & 2048) != 0)
+                simulateError();
+        }
+    }
+
+    /**
+     * java.lang.Error simulator.
+     */
+    public static class ErrorBitHadoopErrorSimulator extends 
RuntimeExceptionBitHadoopErrorSimulator {
+        /**
+         * Constructor.
+         */
+        public ErrorBitHadoopErrorSimulator(int bits) {
+            super(bits);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void simulateError() {
+            throw new Error("An error simulated by " + 
getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * IOException simulator.
+     */
+    public static class IOExceptionBitHadoopErrorSimulator extends 
RuntimeExceptionBitHadoopErrorSimulator {
+        /**
+         * Constructor.
+         */
+        public IOExceptionBitHadoopErrorSimulator(int bits) {
+            super(bits);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void simulateError() throws IOException {
+            throw new IOException("An IOException simulated by " + 
getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
new file mode 100644
index 0000000..946ba77
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test file systems for the working directory multi-threading support.
+ */
+public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+    /** the number of threads */
+    private static final int THREAD_COUNT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+
+    /**
+     * Test the file system with specified URI for the multi-thread working 
directory support.
+     *
+     * @param uri Base URI of the file system (scheme and authority).
+     * @throws Exception If fails.
+     */
+    private void testFileSystem(final URI uri) throws Exception {
+        final Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
+            new Path(new Path(uri), "user/" + 
System.getProperty("user.name")).toString());
+
+        final CountDownLatch changeUserPhase = new 
CountDownLatch(THREAD_COUNT);
+        final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch changeAbsDirPhase = new 
CountDownLatch(THREAD_COUNT);
+        final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT);
+
+        final Path[] newUserInitWorkDir = new Path[THREAD_COUNT];
+        final Path[] newWorkDir = new Path[THREAD_COUNT];
+        final Path[] newAbsWorkDir = new Path[THREAD_COUNT];
+        final Path[] newInstanceWorkDir = new Path[THREAD_COUNT];
+
+        final AtomicInteger threadNum = new AtomicInteger(0);
+
+        GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    int curThreadNum = threadNum.getAndIncrement();
+
+                    if ("file".equals(uri.getScheme()))
+                        FileSystem.get(uri, cfg).setWorkingDirectory(new 
Path("file:///user/user" + curThreadNum));
+
+                    changeUserPhase.countDown();
+                    changeUserPhase.await();
+
+                    newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, 
cfg).getWorkingDirectory();
+
+                    FileSystem.get(uri, cfg).setWorkingDirectory(new 
Path("folder" + curThreadNum));
+
+                    changeDirPhase.countDown();
+                    changeDirPhase.await();
+
+                    newWorkDir[curThreadNum] = FileSystem.get(uri, 
cfg).getWorkingDirectory();
+
+                    FileSystem.get(uri, cfg).setWorkingDirectory(new 
Path("/folder" + curThreadNum));
+
+                    changeAbsDirPhase.countDown();
+                    changeAbsDirPhase.await();
+
+                    newAbsWorkDir[curThreadNum] = FileSystem.get(uri, 
cfg).getWorkingDirectory();
+
+                    newInstanceWorkDir[curThreadNum] = 
FileSystem.newInstance(uri, cfg).getWorkingDirectory();
+
+                    finishPhase.countDown();
+                }
+                catch (InterruptedException | IOException e) {
+                    error("Failed to execute test thread.", e);
+
+                    fail();
+                }
+            }
+        }, THREAD_COUNT, "filesystems-test");
+
+        finishPhase.await();
+
+        for (int i = 0; i < THREAD_COUNT; i ++) {
+            cfg.set(MRJobConfig.USER_NAME, "user" + i);
+
+            Path workDir = new Path(new Path(uri), "user/user" + i);
+
+            cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, 
workDir.toString());
+
+            assertEquals(workDir, FileSystem.newInstance(uri, 
cfg).getWorkingDirectory());
+
+            assertEquals(workDir, newUserInitWorkDir[i]);
+
+            assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + 
i), newWorkDir[i]);
+
+            assertEquals(new Path("/folder" + i), newAbsWorkDir[i]);
+
+            assertEquals(new Path(new Path(uri), "user/" + 
System.getProperty("user.name")), newInstanceWorkDir[i]);
+        }
+
+        System.out.println(System.getProperty("user.dir"));
+    }
+
+    /**
+     * Test LocalFS multi-thread working directory.
+     *
+     * @throws Exception If fails.
+     */
+    public void testLocal() throws Exception {
+        testFileSystem(URI.create("file:///"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
new file mode 100644
index 0000000..db87e33
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Grouping test.
+ */
+public class HadoopGroupingTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** */
+    private static final GridConcurrentHashSet<UUID> vals = 
HadoopSharedMap.map(HadoopGroupingTest.class)
+        .put("vals", new GridConcurrentHashSet<UUID>());
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    protected boolean igfsEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingReducer() throws Exception {
+        doTestGrouping(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingCombiner() throws Exception {
+        doTestGrouping(true);
+    }
+
+    /**
+     * @param combiner With combiner.
+     * @throws Exception If failed.
+     */
+    public void doTestGrouping(boolean combiner) throws Exception {
+        vals.clear();
+
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+        job.setOutputFormatClass(OutFormat.class);
+
+        job.setOutputKeyClass(YearTemperature.class);
+        job.setOutputValueClass(Text.class);
+
+        job.setMapperClass(Mapper.class);
+
+        if (combiner) {
+            job.setCombinerClass(MyReducer.class);
+            job.setNumReduceTasks(0);
+            job.setCombinerKeyGroupingComparatorClass(YearComparator.class);
+        }
+        else {
+            job.setReducerClass(MyReducer.class);
+            job.setNumReduceTasks(4);
+            job.setGroupingComparatorClass(YearComparator.class);
+        }
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(30000);
+
+        assertTrue(vals.isEmpty());
+    }
+
+    public static class MyReducer extends Reducer<YearTemperature, Text, Text, 
Object> {
+        /** */
+        int lastYear;
+
+        @Override protected void reduce(YearTemperature key, Iterable<Text> 
vals0, Context context)
+            throws IOException, InterruptedException {
+            X.println("___ : " + context.getTaskAttemptID() + " --> " + key);
+
+            Set<UUID> ids = new HashSet<>();
+
+            for (Text val : vals0)
+                assertTrue(ids.add(UUID.fromString(val.toString())));
+
+            for (Text val : vals0)
+                assertTrue(ids.remove(UUID.fromString(val.toString())));
+
+            assertTrue(ids.isEmpty());
+
+            assertTrue(key.year > lastYear);
+
+            lastYear = key.year;
+
+            for (Text val : vals0)
+                assertTrue(vals.remove(UUID.fromString(val.toString())));
+        }
+    }
+
+    public static class YearComparator implements 
RawComparator<YearTemperature> { // Grouping comparator.
+        /** {@inheritDoc} */
+        @Override public int compare(YearTemperature o1, YearTemperature o2) {
+            return Integer.compare(o1.year, o2.year);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int 
s2, int l2) {
+            throw new IllegalStateException();
+        }
+    }
+
+    public static class YearTemperature implements 
WritableComparable<YearTemperature>, Cloneable {
+        /** */
+        private int year;
+
+        /** */
+        private int temperature;
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(year);
+            out.writeInt(temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            year = in.readInt();
+            temperature = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            throw new IllegalStateException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() { // To be partitioned by year.
+            return year;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(YearTemperature o) {
+            int res = Integer.compare(year, o.year);
+
+            if (res != 0)
+                return res;
+
+            // Sort comparator by year and temperature, to find max for year.
+            return Integer.compare(o.temperature, temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(YearTemperature.class, this);
+        }
+    }
+
+    public static class InFormat extends InputFormat<YearTemperature, Text> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext context) throws 
IOException, InterruptedException {
+            ArrayList<InputSplit> list = new ArrayList<>();
+
+            for (int i = 0; i < 10; i++)
+                list.add(new HadoopSortingTest.FakeSplit(20));
+
+            return list;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<YearTemperature, Text> 
createRecordReader(final InputSplit split,
+            TaskAttemptContext context) throws IOException, 
InterruptedException {
+            return new RecordReader<YearTemperature, Text>() {
+                /** */
+                int cnt;
+
+                /** */
+                Random rnd = new GridRandom();
+
+                /** */
+                YearTemperature key = new YearTemperature();
+
+                /** */
+                Text val = new Text();
+
+                @Override public void initialize(InputSplit split, 
TaskAttemptContext context) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
+                    return cnt++ < split.getLength();
+                }
+
+                @Override public YearTemperature getCurrentKey() {
+                    key.year = 1990 + rnd.nextInt(10);
+                    key.temperature = 10 + rnd.nextInt(20);
+
+                    return key;
+                }
+
+                @Override public Text getCurrentValue() {
+                    UUID id = UUID.randomUUID();
+
+                    assertTrue(vals.add(id));
+
+                    val.set(id.toString());
+
+                    return val;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class OutFormat extends OutputFormat {
+        /** {@inheritDoc} */
+        @Override public RecordWriter getRecordWriter(TaskAttemptContext 
context) throws IOException, InterruptedException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void checkOutputSpecs(JobContext context) throws 
IOException, InterruptedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public OutputCommitter getOutputCommitter(TaskAttemptContext 
context) throws IOException, InterruptedException {
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
new file mode 100644
index 0000000..9e268b7
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Job tracker self test.
+ */
+public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** Test block count parameter name. */
+    private static final int BLOCK_CNT = 10;
+
+    /** */
+    private static HadoopSharedMap m = 
HadoopSharedMap.map(HadoopJobTrackerSelfTest.class);
+
+    /** Map task execution count. */
+    private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new 
AtomicInteger());
+
+    /** Reduce task execution count. */
+    private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", 
new AtomicInteger());
+
+    /** Reduce task execution count. */
+    private static final AtomicInteger combineExecCnt = 
m.put("combineExecCnt", new AtomicInteger());
+
+    /** */
+    private static final Map<String, CountDownLatch> latch = m.put("latch", 
new HashMap<String, CountDownLatch>());
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        latch.put("mapAwaitLatch", new CountDownLatch(1));
+        latch.put("reduceAwaitLatch", new CountDownLatch(1));
+        latch.put("combineAwaitLatch", new CountDownLatch(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        mapExecCnt.set(0);
+        combineExecCnt.set(0);
+        reduceExecCnt.set(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner());
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleTaskSubmit() throws Exception {
+        try {
+            UUID globalId = UUID.randomUUID();
+
+            Job job = Job.getInstance();
+            setupFileSystems(job.getConfiguration());
+
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
+            job.setInputFormatClass(InFormat.class);
+
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + 
PATH_OUTPUT + "1"));
+
+            HadoopJobId jobId = new HadoopJobId(globalId, 1);
+
+            grid(0).hadoop().submit(jobId, 
createJobInfo(job.getConfiguration()));
+
+            checkStatus(jobId, false);
+
+            info("Releasing map latch.");
+
+            latch.get("mapAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            info("Releasing reduce latch.");
+
+            latch.get("reduceAwaitLatch").countDown();
+
+            checkStatus(jobId, true);
+
+            assertEquals(10, mapExecCnt.get());
+            assertEquals(0, combineExecCnt.get());
+            assertEquals(1, reduceExecCnt.get());
+        }
+        finally {
+            // Safety.
+            latch.get("mapAwaitLatch").countDown();
+            latch.get("combineAwaitLatch").countDown();
+            latch.get("reduceAwaitLatch").countDown();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskWithCombinerPerMap() throws Exception {
+        try {
+            UUID globalId = UUID.randomUUID();
+
+            Job job = Job.getInstance();
+            setupFileSystems(job.getConfiguration());
+
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
+            job.setCombinerClass(TestCombiner.class);
+            job.setInputFormatClass(InFormat.class);
+
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + 
PATH_OUTPUT + "2"));
+
+            HadoopJobId jobId = new HadoopJobId(globalId, 1);
+
+            grid(0).hadoop().submit(jobId, 
createJobInfo(job.getConfiguration()));
+
+            checkStatus(jobId, false);
+
+            info("Releasing map latch.");
+
+            latch.get("mapAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            // All maps are completed. We have a combiner, so no reducers 
should be executed
+            // before combiner latch is released.
+
+            U.sleep(50);
+
+            assertEquals(0, reduceExecCnt.get());
+
+            info("Releasing combiner latch.");
+
+            latch.get("combineAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            info("Releasing reduce latch.");
+
+            latch.get("reduceAwaitLatch").countDown();
+
+            checkStatus(jobId, true);
+
+            assertEquals(10, mapExecCnt.get());
+            assertEquals(10, combineExecCnt.get());
+            assertEquals(1, reduceExecCnt.get());
+        }
+        finally {
+            // Safety.
+            latch.get("mapAwaitLatch").countDown();
+            latch.get("combineAwaitLatch").countDown();
+            latch.get("reduceAwaitLatch").countDown();
+        }
+    }
+
+    /**
+     * Checks job execution status.
+     *
+     * @param jobId Job ID.
+     * @param complete Completion status.
+     * @throws Exception If failed.
+     */
+    private void checkStatus(HadoopJobId jobId, boolean complete) throws 
Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteKernal kernal = (IgniteKernal)grid(i);
+
+            Hadoop hadoop = kernal.hadoop();
+
+            HadoopJobStatus stat = hadoop.status(jobId);
+
+            assert stat != null;
+
+            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+            if (!complete)
+                assertFalse(fut.isDone());
+            else {
+                info("Waiting for status future completion on node [idx=" + i 
+ ", nodeId=" +
+                    kernal.getLocalNodeId() + ']');
+
+                fut.get();
+            }
+        }
+    }
+
+    /**
+     * Test input format
+     */
+    public static class InFormat extends InputFormat {
+
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws 
IOException, InterruptedException {
+            List<InputSplit> res = new ArrayList<>(BLOCK_CNT);
+
+            for (int i = 0; i < BLOCK_CNT; i++)
+                try {
+                    res.add(new FileSplit(new Path(new URI("someFile")), i, i 
+ 1, new String[] {"localhost"}));
+                }
+                catch (URISyntaxException e) {
+                    throw new IOException(e);
+                }
+
+            return res;
+        }
+
+        @Override public RecordReader createRecordReader(InputSplit split, 
TaskAttemptContext ctx) throws IOException, InterruptedException {
+            return new RecordReader() {
+                @Override public void initialize(InputSplit split, 
TaskAttemptContext ctx) {
+                }
+
+                @Override public boolean nextKeyValue() {
+                    return false;
+                }
+
+                @Override public Object getCurrentKey() {
+                    return null;
+                }
+
+                @Override public Object getCurrentValue() {
+                    return null;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+
+                }
+            };
+        }
+    }
+
+    /**
+     * Test mapper.
+     */
+    private static class TestMapper extends Mapper {
+        @Override public void run(Context ctx) throws IOException, 
InterruptedException {
+            System.out.println("Running task: " + 
ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("mapAwaitLatch").await();
+
+            mapExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + 
ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+
+    /**
+     * Test reducer.
+     */
+    private static class TestReducer extends Reducer {
+        @Override public void run(Context ctx) throws IOException, 
InterruptedException {
+            System.out.println("Running task: " + 
ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("reduceAwaitLatch").await();
+
+            reduceExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + 
ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+
+    /**
+     * Test combiner.
+     */
+    private static class TestCombiner extends Reducer {
+        @Override public void run(Context ctx) throws IOException, 
InterruptedException {
+            System.out.println("Running task: " + 
ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("combineAwaitLatch").await();
+
+            combineExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + 
ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
new file mode 100644
index 0000000..25ef382
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Tests map-reduce execution with embedded mode.
+ */
+public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
+    /** */
+    private static Map<String, Boolean> flags = 
HadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class)
+        .put("flags", new HashMap<String, Boolean>());
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * Tests whole job execution with all phases in old and new versions of 
API with definition of custom
+     * Serialization, Partitioner and IO formats.
+     * @throws Exception If fails.
+     */
+    public void testMultiReducerWholeMapReduceExecution() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, 
HadoopWordCount2.class.getSimpleName() + "-input");
+
+        generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, 
"key3", 15000, "key4", 7000, "key5", 12000,
+            "key6", 18000 );
+
+        for (int i = 0; i < 2; i++) {
+            boolean useNewAPI = i == 1;
+
+            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+            flags.put("serializationWasConfigured", false);
+            flags.put("partitionerWasConfigured", false);
+            flags.put("inputFormatWasConfigured", false);
+            flags.put("outputFormatWasConfigured", false);
+
+            JobConf jobConf = new JobConf();
+
+            jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, 
CustomSerialization.class.getName());
+
+            //To split into about 6-7 items for v2
+            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+            //For v1
+            jobConf.setInt("fs.local.block.size", 65000);
+
+            // File system coordinates.
+            setupFileSystems(jobConf);
+
+            HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, 
!useNewAPI);
+
+            if (!useNewAPI) {
+                jobConf.setPartitionerClass(CustomV1Partitioner.class);
+                jobConf.setInputFormat(CustomV1InputFormat.class);
+                jobConf.setOutputFormat(CustomV1OutputFormat.class);
+            }
+
+            Job job = Job.getInstance(jobConf);
+
+            HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, 
useNewAPI, false);
+
+            if (useNewAPI) {
+                job.setPartitionerClass(CustomV2Partitioner.class);
+                job.setInputFormatClass(CustomV2InputFormat.class);
+                job.setOutputFormatClass(CustomV2OutputFormat.class);
+            }
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
+
+            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + 
inFile.toString()));
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + 
PATH_OUTPUT));
+
+            job.setNumReduceTasks(3);
+
+            job.setJarByClass(HadoopWordCount2.class);
+
+            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 1),
+                    createJobInfo(job.getConfiguration()));
+
+            fut.get();
+
+            assertTrue("Serialization was configured (new API is " + useNewAPI 
+ ")",
+                 flags.get("serializationWasConfigured"));
+
+            assertTrue("Partitioner was configured (new API is = " + useNewAPI 
+ ")",
+                 flags.get("partitionerWasConfigured"));
+
+            assertTrue("Input format was configured (new API is = " + 
useNewAPI + ")",
+                 flags.get("inputFormatWasConfigured"));
+
+            assertTrue("Output format was configured (new API is = " + 
useNewAPI + ")",
+                 flags.get("outputFormatWasConfigured"));
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key3\t15000\n" +
+                "key6\t18000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : 
"part-") + "00000")
+            );
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key1\t10000\n" +
+                "key4\t7000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : 
"part-") + "00001")
+            );
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key2\t20000\n" +
+                "key5\t12000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : 
"part-") + "00002")
+            );
+
+        }
+    }
+
+    /**
+     * Custom serialization class that inherits behaviour of native {@link 
WritableSerialization}.
+     */
+    protected static class CustomSerialization extends WritableSerialization {
+        @Override public void setConf(Configuration conf) {
+            super.setConf(conf);
+
+            flags.put("serializationWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of Partitioner in v1 API.
+     */
+    private static class CustomV1Partitioner extends 
org.apache.hadoop.mapred.lib.HashPartitioner {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            flags.put("partitionerWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of Partitioner in v2 API.
+     */
+    private static class CustomV2Partitioner extends 
org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
+            implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("partitionerWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of InputFormat in v2 API.
+     */
+    private static class CustomV2InputFormat extends 
org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("inputFormatWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of OutputFormat in v2 API.
+     */
+    private static class CustomV2OutputFormat extends 
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable 
{
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("outputFormatWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of InputFormat in v1 API.
+     */
+    private static class CustomV1InputFormat extends 
org.apache.hadoop.mapred.TextInputFormat {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            super.configure(job);
+
+            flags.put("inputFormatWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of OutputFormat in v1 API.
+     */
+    private static class CustomV1OutputFormat extends 
org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            flags.put("outputFormatWasConfigured", true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
new file mode 100644
index 0000000..dd12935
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+
+/**
+ * Test of error resiliency after an error in a map-reduce job execution.
+ * Combinations tested:
+ * { new ALI, old API }
+ *   x { unchecked exception, checked exception, error }
+ *   x { phase where the error happens }.
+ */
+public class HadoopMapReduceErrorResilienceTest extends 
HadoopAbstractMapReduceTest {
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_Runtime() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_IOException() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_Error() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_Runtime() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime);
+    }
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_IOException() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException);
+    }
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_Error() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /**
+     * Tests correct work after an error.
+     *
+     * @throws Exception On error.
+     */
+    private void doTestRecoveryAfterAnError(int useNewBits, 
HadoopErrorSimulator.Kind simulatorKind) throws Exception {
+        try {
+            IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+            igfs.mkdirs(inDir);
+
+            IgfsPath inFile = new IgfsPath(inDir, 
HadoopWordCount2.class.getSimpleName() + "-input");
+
+            generateTestFile(inFile.toString(), "red", red, "blue", blue, 
"green", green, "yellow", yellow);
+
+            boolean useNewMapper = (useNewBits & 1) == 0;
+            boolean useNewCombiner = (useNewBits & 2) == 0;
+            boolean useNewReducer = (useNewBits & 4) == 0;
+
+            for (int i = 0; i < 12; i++) {
+                int bits = 1 << i;
+
+                System.out.println("############################ Simulator 
kind = " + simulatorKind
+                    + ", Stage bits = " + bits);
+
+                HadoopErrorSimulator sim = 
HadoopErrorSimulator.create(simulatorKind, bits);
+
+                doTestWithErrorSimulator(sim, inFile, useNewMapper, 
useNewCombiner, useNewReducer);
+            }
+        } catch (Throwable t) {
+            t.printStackTrace();
+
+            fail("Unexpected throwable: " + t);
+        }
+    }
+
+    /**
+     * Performs test with given error simulator.
+     *
+     * @param sim The simulator.
+     * @param inFile Input file.
+     * @param useNewMapper If the use new mapper API.
+     * @param useNewCombiner If to use new combiner.
+     * @param useNewReducer If to use new reducer API.
+     * @throws Exception If failed.
+     */
+    private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath 
inFile, boolean useNewMapper,
+        boolean useNewCombiner, boolean useNewReducer) throws Exception {
+        // Set real simulating error simulator:
+        
assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, 
sim));
+
+        try {
+            // Expect failure there:
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+        }
+        catch (Throwable t) { // This may be an Error.
+            // Expected:
+            System.out.println(t.toString()); // Ignore, continue the test.
+        }
+
+        // Set no-op error simulator:
+        assertTrue(HadoopErrorSimulator.setInstance(sim, 
HadoopErrorSimulator.noopInstance));
+
+        // Expect success there:
+        doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
new file mode 100644
index 0000000..b703896
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+
+/**
+ * Test of whole cycle of map-reduce processing via Job tracker.
+ */
+public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
+    /**
+     * Tests whole job execution with all phases in all combination of new and 
old versions of API.
+     * @throws Exception If fails.
+     */
+    public void testWholeMapReduceExecution() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, 
HadoopWordCount2.class.getSimpleName() + "-input");
+
+        generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", 
green, "yellow", yellow );
+
+        for (boolean[] apiMode: getApiModes()) {
+            assert apiMode.length == 3;
+
+            boolean useNewMapper = apiMode[0];
+            boolean useNewCombiner = apiMode[1];
+            boolean useNewReducer = apiMode[2];
+
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+        }
+    }
+
+    /**
+     * Gets API mode combinations to be tested.
+     * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
+     *
+     * @return Arrays of booleans indicating API combinations to test.
+     */
+    protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, false },
+            { false, false, true },
+            { false, true,  false },
+            { true,  false, false },
+            { true,  true,  true },
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
new file mode 100644
index 0000000..0c172c3
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Test attempt to execute a map-reduce task while no Hadoop processor 
available.
+ */
+public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setHadoopConfiguration(null);
+        c.setPeerClassLoadingEnabled(true);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testWholeMapReduceExecution() throws Exception {
+        try {
+            super.testWholeMapReduceExecution();
+
+            fail("IllegalStateException expected.");
+        }
+        catch (IllegalStateException ignore) {
+            // No-op.
+        }
+    }
+}

Reply via email to