IGNITE-3916: Moved MR planner to common module.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b66b969
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b66b969
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b66b969

Branch: refs/heads/ignite-3916
Commit: 4b66b969cb447895bfee142701633d99885cdf12
Parents: 5515280
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Fri Sep 16 16:09:34 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Fri Sep 16 16:09:34 2016 +0300

----------------------------------------------------------------------
 .../mapreduce/IgniteHadoopMapReducePlanner.java | 416 ---------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 846 -------------------
 .../internal/processors/hadoop/HadoopUtils.java |  75 --
 .../hadoop/igfs/HadoopIgfsEndpoint.java         | 210 -----
 .../planner/HadoopAbstractMapReducePlanner.java | 116 ---
 .../planner/HadoopDefaultMapReducePlan.java     | 109 ---
 .../planner/HadoopMapReducePlanGroup.java       | 150 ----
 .../planner/HadoopMapReducePlanTopology.java    |  89 --
 .../mapreduce/IgniteHadoopMapReducePlanner.java | 416 +++++++++
 .../IgniteHadoopWeightedMapReducePlanner.java   | 846 +++++++++++++++++++
 .../processors/hadoop/HadoopCommonUtils.java    | 110 +++
 .../hadoop/igfs/HadoopIgfsEndpoint.java         | 210 +++++
 .../planner/HadoopAbstractMapReducePlanner.java | 116 +++
 .../planner/HadoopDefaultMapReducePlan.java     | 110 +++
 .../planner/HadoopMapReducePlanGroup.java       | 150 ++++
 .../planner/HadoopMapReducePlanTopology.java    |  89 ++
 16 files changed, 2047 insertions(+), 2011 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
deleted file mode 100644
index d4a44fa..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
-import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
-
-/**
- * Default map-reduce planner implementation.
- */
-public class IgniteHadoopMapReducePlanner extends 
HadoopAbstractMapReducePlanner {
-    /** {@inheritDoc} */
-    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> top,
-        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        // Convert collection of topology nodes to collection of topology node 
IDs.
-        Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
-
-        for (ClusterNode topNode : top)
-            topIds.add(topNode.id());
-
-        Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, 
job.input());
-
-        int rdcCnt = job.info().reducers();
-
-        if (rdcCnt < 0)
-            throw new IgniteCheckedException("Number of reducers must be 
non-negative, actual: " + rdcCnt);
-
-        Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt);
-
-        return new HadoopDefaultMapReducePlan(mappers, reducers);
-    }
-
-    /**
-     * Create plan for mappers.
-     *
-     * @param top Topology nodes.
-     * @param topIds Topology node IDs.
-     * @param splits Splits.
-     * @return Mappers map.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Map<UUID, Collection<HadoopInputSplit>> 
mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
-        Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
-        Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
-
-        Map<String, Collection<UUID>> nodes = groupByHost(top);
-
-        Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // 
Track node load.
-
-        for (UUID nodeId : topIds)
-            nodeLoads.put(nodeId, 0);
-
-        for (HadoopInputSplit split : splits) {
-            UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
-
-            if (log.isDebugEnabled())
-                log.debug("Mapped split to node [split=" + split + ", nodeId=" 
+ nodeId + ']');
-
-            Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId);
-
-            if (nodeSplits == null) {
-                nodeSplits = new ArrayList<>();
-
-                mappers.put(nodeId, nodeSplits);
-            }
-
-            nodeSplits.add(split);
-
-            // Updated node load.
-            nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
-        }
-
-        return mappers;
-    }
-
-    /**
-     * Determine the best node for this split.
-     *
-     * @param split Split.
-     * @param topIds Topology node IDs.
-     * @param nodes Nodes.
-     * @param nodeLoads Node load tracker.
-     * @return Node ID.
-     */
-    @SuppressWarnings("unchecked")
-    private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, 
Map<String, Collection<UUID>> nodes,
-        Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock split0 = (HadoopFileBlock)split;
-
-            if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
-                HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(split0.file().getAuthority());
-
-                IgfsEx igfs = null;
-
-                if (F.eq(ignite.name(), endpoint.grid()))
-                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
-
-                if (igfs != null && !igfs.isProxy(split0.file())) {
-                    IgfsPath path = new IgfsPath(split0.file());
-
-                    if (igfs.exists(path)) {
-                        Collection<IgfsBlockLocation> blocks;
-
-                        try {
-                            blocks = igfs.affinity(path, split0.start(), 
split0.length());
-                        }
-                        catch (IgniteException e) {
-                            throw new IgniteCheckedException(e);
-                        }
-
-                        assert blocks != null;
-
-                        if (blocks.size() == 1)
-                            // Fast-path, split consists of one IGFS block (as 
in most cases).
-                            return 
bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false);
-                        else {
-                            // Slow-path, file consists of multiple IGFS 
blocks. First, find the most co-located nodes.
-                            Map<UUID, Long> nodeMap = new HashMap<>();
-
-                            List<UUID> bestNodeIds = null;
-                            long bestLen = -1L;
-
-                            for (IgfsBlockLocation block : blocks) {
-                                for (UUID blockNodeId : block.nodeIds()) {
-                                    if (topIds.contains(blockNodeId)) {
-                                        Long oldLen = nodeMap.get(blockNodeId);
-                                        long newLen = oldLen == null ? 
block.length() : oldLen + block.length();
-
-                                        nodeMap.put(blockNodeId, newLen);
-
-                                        if (bestNodeIds == null || bestLen < 
newLen) {
-                                            bestNodeIds = new ArrayList<>(1);
-
-                                            bestNodeIds.add(blockNodeId);
-
-                                            bestLen = newLen;
-                                        }
-                                        else if (bestLen == newLen) {
-                                            assert !F.isEmpty(bestNodeIds);
-
-                                            bestNodeIds.add(blockNodeId);
-                                        }
-                                    }
-                                }
-                            }
-
-                            if (bestNodeIds != null) {
-                                return bestNodeIds.size() == 1 ? 
bestNodeIds.get(0) :
-                                    bestNode(bestNodeIds, topIds, nodeLoads, 
true);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        // Cannot use local IGFS for some reason, try selecting the node by 
host.
-        Collection<UUID> blockNodes = null;
-
-        for (String host : split.hosts()) {
-            Collection<UUID> hostNodes = nodes.get(host);
-
-            if (!F.isEmpty(hostNodes)) {
-                if (blockNodes == null)
-                    blockNodes = new ArrayList<>(hostNodes);
-                else
-                    blockNodes.addAll(hostNodes);
-            }
-        }
-
-        return bestNode(blockNodes, topIds, nodeLoads, false);
-    }
-
-    /**
-     * Finds the best (the least loaded) node among the candidates.
-     *
-     * @param candidates Candidates.
-     * @param topIds Topology node IDs.
-     * @param nodeLoads Known node loads.
-     * @param skipTopCheck Whether to skip topology check.
-     * @return The best node.
-     */
-    private UUID bestNode(@Nullable Collection<UUID> candidates, 
Collection<UUID> topIds, Map<UUID, Integer> nodeLoads,
-        boolean skipTopCheck) {
-        UUID bestNode = null;
-        int bestLoad = Integer.MAX_VALUE;
-
-        if (candidates != null) {
-            for (UUID candidate : candidates) {
-                if (skipTopCheck || topIds.contains(candidate)) {
-                    int load = nodeLoads.get(candidate);
-
-                    if (bestNode == null || bestLoad > load) {
-                        bestNode = candidate;
-                        bestLoad = load;
-
-                        if (bestLoad == 0)
-                            break; // Minimum load possible, no need for 
further iterations.
-                    }
-                }
-            }
-        }
-
-        if (bestNode == null) {
-            // Blocks are located on nodes which are not Hadoop-enabled, 
assign to the least loaded one.
-            bestLoad = Integer.MAX_VALUE;
-
-            for (UUID nodeId : topIds) {
-                int load = nodeLoads.get(nodeId);
-
-                if (bestNode == null || bestLoad > load) {
-                    bestNode = nodeId;
-                    bestLoad = load;
-
-                    if (bestLoad == 0)
-                        break; // Minimum load possible, no need for further 
iterations.
-                }
-            }
-        }
-
-        assert bestNode != null;
-
-        return bestNode;
-    }
-
-    /**
-     * Create plan for reducers.
-     *
-     * @param top Topology.
-     * @param mappers Mappers map.
-     * @param reducerCnt Reducers count.
-     * @return Reducers map.
-     */
-    private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
-        Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) {
-        // Determine initial node weights.
-        int totalWeight = 0;
-
-        List<WeightedNode> nodes = new ArrayList<>(top.size());
-
-        for (ClusterNode node : top) {
-            Collection<HadoopInputSplit> split = mappers.get(node.id());
-
-            int weight = reducerNodeWeight(node, split != null ? split.size() 
: 0);
-
-            nodes.add(new WeightedNode(node.id(), weight, weight));
-
-            totalWeight += weight;
-        }
-
-        // Adjust weights.
-        int totalAdjustedWeight = 0;
-
-        for (WeightedNode node : nodes) {
-            node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
-
-            node.weight = Math.round(node.floatWeight);
-
-            totalAdjustedWeight += node.weight;
-        }
-
-        // Apply redundant/lost reducers.
-        Collections.sort(nodes);
-
-        if (totalAdjustedWeight > reducerCnt) {
-            // Too much reducers set.
-            ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() 
- 1);
-
-            while (totalAdjustedWeight != reducerCnt) {
-                if (!iter.hasPrevious())
-                    iter = nodes.listIterator(nodes.size() - 1);
-
-                WeightedNode node = iter.previous();
-
-                if (node.weight > 0) {
-                    node.weight -= 1;
-
-                    totalAdjustedWeight--;
-                }
-            }
-        }
-        else if (totalAdjustedWeight < reducerCnt) {
-            // Not enough reducers set.
-            ListIterator<WeightedNode> iter = nodes.listIterator(0);
-
-            while (totalAdjustedWeight != reducerCnt) {
-                if (!iter.hasNext())
-                    iter = nodes.listIterator(0);
-
-                WeightedNode node = iter.next();
-
-                if (node.floatWeight > 0.0f) {
-                    node.weight += 1;
-
-                    totalAdjustedWeight++;
-                }
-            }
-        }
-
-        int idx = 0;
-
-        Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
-
-        for (WeightedNode node : nodes) {
-            if (node.weight > 0) {
-                int[] arr = new int[node.weight];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = idx++;
-
-                reducers.put(node.nodeId, arr);
-            }
-        }
-
-        return reducers;
-    }
-
-    /**
-     * Calculate node weight based on node metrics and data co-location.
-     *
-     * @param node Node.
-     * @param splitCnt Splits mapped to this node.
-     * @return Node weight.
-     */
-    @SuppressWarnings("UnusedParameters")
-    protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
-        return splitCnt;
-    }
-
-    /**
-     * Weighted node.
-     */
-    private static class WeightedNode implements Comparable<WeightedNode> {
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Weight. */
-        private int weight;
-
-        /** Floating point weight. */
-        private float floatWeight;
-
-        /**
-         * Constructor.
-         *
-         * @param nodeId Node ID.
-         * @param weight Weight.
-         * @param floatWeight Floating point weight.
-         */
-        private WeightedNode(UUID nodeId, int weight, float floatWeight) {
-            this.nodeId = nodeId;
-            this.weight = weight;
-            this.floatWeight = floatWeight;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj != null && obj instanceof WeightedNode && F.eq(nodeId, 
((WeightedNode)obj).nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return nodeId.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compareTo(@NotNull WeightedNode other) {
-            float res = other.floatWeight - floatWeight;
-
-            return res > 0.0f ? 1 : res < 0.0f ? -1 : 
nodeId.compareTo(other.nodeId);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
deleted file mode 100644
index 27ffc19..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ /dev/null
@@ -1,846 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
-import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
-import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
-import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * Map-reduce planner which assigns mappers and reducers based on their 
"weights". Weight describes how much resources
- * are required to execute particular map or reduce task.
- * <p>
- * Plan creation consists of two steps: assigning mappers and assigning 
reducers.
- * <p>
- * Mappers are assigned based on input split data location. For each input 
split we search for nodes where
- * its data is stored. Planner tries to assign mappers to their affinity nodes 
first. This process is governed by two
- * properties:
- * <ul>
- *     <li><b>{@code localMapperWeight}</b> - weight of a map task when it is 
executed on an affinity node;</li>
- *     <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is 
executed on a non-affinity node.</li>
- * </ul>
- * Planning algorithm assign mappers so that total resulting weight on all 
nodes is minimum possible.
- * <p>
- * Reducers are assigned differently. First we try to distribute reducers 
across nodes with mappers. This approach
- * could minimize expensive data transfer over network. Reducer assigned to a 
node with mapper is considered
- * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. 
This process continue until certain weight
- * threshold is reached what means that current node is already too busy and 
it should not have higher priority over
- * other nodes any more. Threshold can be configured using <b>{@code 
preferLocalReducerThresholdWeight}</b> property.
- * <p>
- * When local reducer threshold is reached on all nodes, we distribute 
remaining reducers based on their local and
- * remote weights in the same way as it is done for mappers. This process is 
governed by two
- * properties:
- * <ul>
- *     <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it 
is executed on a node with mappers;</li>
- *     <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it 
is executed on a node without mappers.</li>
- * </ul>
- */
-public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReducePlanner {
-    /** Default local mapper weight. */
-    public static final int DFLT_LOC_MAPPER_WEIGHT = 100;
-
-    /** Default remote mapper weight. */
-    public static final int DFLT_RMT_MAPPER_WEIGHT = 100;
-
-    /** Default local reducer weight. */
-    public static final int DFLT_LOC_REDUCER_WEIGHT = 100;
-
-    /** Default remote reducer weight. */
-    public static final int DFLT_RMT_REDUCER_WEIGHT = 100;
-
-    /** Default reducer migration threshold weight. */
-    public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200;
-
-    /** Local mapper weight. */
-    private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT;
-
-    /** Remote mapper weight. */
-    private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT;
-
-    /** Local reducer weight. */
-    private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT;
-
-    /** Remote reducer weight. */
-    private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT;
-
-    /** Reducer migration threshold weight. */
-    private int preferLocReducerThresholdWeight = 
DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT;
-
-    /** {@inheritDoc} */
-    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> nodes,
-        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        List<HadoopInputSplit> splits = 
HadoopUtils.sortInputSplits(job.input());
-        int reducerCnt = job.info().reducers();
-
-        if (reducerCnt < 0)
-            throw new IgniteCheckedException("Number of reducers must be 
non-negative, actual: " + reducerCnt);
-
-        HadoopMapReducePlanTopology top = topology(nodes);
-
-        Mappers mappers = assignMappers(splits, top);
-
-        Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, 
reducerCnt);
-
-        return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
-    }
-
-    /**
-     * Assign mappers to nodes.
-     *
-     * @param splits Input splits.
-     * @param top Topology.
-     * @return Mappers.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Mappers assignMappers(Collection<HadoopInputSplit> splits,
-        HadoopMapReducePlanTopology top) throws IgniteCheckedException {
-        Mappers res = new Mappers();
-
-        for (HadoopInputSplit split : splits) {
-            // Try getting IGFS affinity.
-            Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
-
-            // Get best node.
-            UUID node = bestMapperNode(nodeIds, top);
-
-            assert node != null;
-
-            res.add(split, node);
-        }
-
-        return res;
-    }
-
-    /**
-     * Get affinity nodes for the given input split.
-     * <p>
-     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
-     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
-     *
-     * @param split Split.
-     * @param top Topology.
-     * @return Affintiy nodes.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
-        throws IgniteCheckedException {
-        Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split);
-
-        if (igfsNodeIds != null)
-            return igfsNodeIds;
-
-        Map<NodeIdAndLength, UUID> res = new TreeMap<>();
-
-        for (String host : split.hosts()) {
-            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0L;
-
-            HadoopMapReducePlanGroup grp = top.groupForHost(host);
-
-            if (grp != null) {
-                for (int i = 0; i < grp.nodeCount(); i++) {
-                    UUID nodeId = grp.nodeId(i);
-
-                    res.put(new NodeIdAndLength(nodeId, len), nodeId);
-                }
-            }
-        }
-
-        return new LinkedHashSet<>(res.values());
-    }
-
-    /**
-     * Get IGFS affinity nodes for split if possible.
-     * <p>
-     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
-     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
-     *
-     * @param split Input split.
-     * @return IGFS affinity or {@code null} if IGFS is not available.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable private Collection<UUID> 
igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException 
{
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock split0 = (HadoopFileBlock)split;
-
-            if 
(IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
-                HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(split0.file().getAuthority());
-
-                IgfsEx igfs = null;
-
-                if (F.eq(ignite.name(), endpoint.grid()))
-                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
-
-                if (igfs != null && !igfs.isProxy(split0.file())) {
-                    IgfsPath path = new IgfsPath(split0.file());
-
-                    if (igfs.exists(path)) {
-                        Collection<IgfsBlockLocation> blocks;
-
-                        try {
-                            blocks = igfs.affinity(path, split0.start(), 
split0.length());
-                        }
-                        catch (IgniteException e) {
-                            throw new IgniteCheckedException("Failed to get 
IGFS file block affinity [path=" + path +
-                                ", start=" + split0.start() + ", len=" + 
split0.length() + ']', e);
-                        }
-
-                        assert blocks != null;
-
-                        if (blocks.size() == 1)
-                            return blocks.iterator().next().nodeIds();
-                        else {
-                            // The most "local" nodes go first.
-                            Map<UUID, Long> idToLen = new HashMap<>();
-
-                            for (IgfsBlockLocation block : blocks) {
-                                for (UUID id : block.nodeIds()) {
-                                    Long len = idToLen.get(id);
-
-                                    idToLen.put(id, len == null ? 
block.length() : block.length() + len);
-                                }
-                            }
-
-                            // Sort the nodes in non-ascending order by 
contained data lengths.
-                            Map<NodeIdAndLength, UUID> res = new TreeMap<>();
-
-                            for (Map.Entry<UUID, Long> idToLenEntry : 
idToLen.entrySet()) {
-                                UUID id = idToLenEntry.getKey();
-
-                                res.put(new NodeIdAndLength(id, 
idToLenEntry.getValue()), id);
-                            }
-
-                            return new LinkedHashSet<>(res.values());
-                        }
-                    }
-                }
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * Find best mapper node.
-     *
-     * @param affIds Affinity node IDs.
-     * @param top Topology.
-     * @return Result.
-     */
-    private UUID bestMapperNode(@Nullable Collection<UUID> affIds, 
HadoopMapReducePlanTopology top) {
-        // Priority node.
-        UUID prioAffId = F.first(affIds);
-
-        // Find group with the least weight.
-        HadoopMapReducePlanGroup resGrp = null;
-        MapperPriority resPrio = MapperPriority.NORMAL;
-        int resWeight = Integer.MAX_VALUE;
-
-        for (HadoopMapReducePlanGroup grp : top.groups()) {
-            MapperPriority prio = groupPriority(grp, affIds, prioAffId);
-
-            int weight = grp.weight() + (prio == MapperPriority.NORMAL ? 
rmtMapperWeight : locMapperWeight);
-
-            if (resGrp == null || weight < resWeight || weight == resWeight && 
prio.value() > resPrio.value()) {
-                resGrp = grp;
-                resPrio = prio;
-                resWeight = weight;
-            }
-        }
-
-        assert resGrp != null;
-
-        // Update group weight for further runs.
-        resGrp.weight(resWeight);
-
-        // Return the best node from the group.
-        return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId);
-    }
-
-    /**
-     * Get best node in the group.
-     *
-     * @param grp Group.
-     * @param priority Priority.
-     * @param affIds Affinity IDs.
-     * @param prioAffId Priority affinity IDs.
-     * @return Best node ID in the group.
-     */
-    private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, 
MapperPriority priority,
-        @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) {
-        // Return the best node from the group.
-        int idx = 0;
-
-        // This is rare situation when several nodes are started on the same 
host.
-        if (!grp.single()) {
-            switch (priority) {
-                case NORMAL: {
-                    // Pick any node.
-                    idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
-
-                    break;
-                }
-                case HIGH: {
-                    // Pick any affinity node.
-                    assert affIds != null;
-
-                    List<Integer> cands = new ArrayList<>();
-
-                    for (int i = 0; i < grp.nodeCount(); i++) {
-                        UUID id = grp.nodeId(i);
-
-                        if (affIds.contains(id))
-                            cands.add(i);
-                    }
-
-                    idx = 
cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
-
-                    break;
-                }
-                default: {
-                    // Find primary node.
-                    assert prioAffId != null;
-
-                    for (int i = 0; i < grp.nodeCount(); i++) {
-                        UUID id = grp.nodeId(i);
-
-                        if (F.eq(id, prioAffId)) {
-                            idx = i;
-
-                            break;
-                        }
-                    }
-
-                    assert priority == MapperPriority.HIGHEST;
-                }
-            }
-        }
-
-        return grp.nodeId(idx);
-    }
-
-    /**
-     * Generate reducers.
-     *
-     * @param splits Input splits.
-     * @param top Topology.
-     * @param mappers Mappers.
-     * @param reducerCnt Reducer count.
-     * @return Reducers.
-     */
-    private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> 
splits, HadoopMapReducePlanTopology top,
-        Mappers mappers, int reducerCnt) {
-        Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, 
reducerCnt);
-
-        int cnt = 0;
-
-        Map<UUID, int[]> res = new HashMap<>(reducers.size());
-
-        for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) {
-            int[] arr = new int[reducerEntry.getValue()];
-
-            for (int i = 0; i < arr.length; i++)
-                arr[i] = cnt++;
-
-            res.put(reducerEntry.getKey(), arr);
-        }
-
-        assert reducerCnt == cnt : reducerCnt + " != " + cnt;
-
-        return res;
-    }
-
-    /**
-     * Generate reducers.
-     *
-     * @param top Topology.
-     * @param splits Input splits.
-     * @param mappers Mappers.
-     * @param reducerCnt Reducer count.
-     * @return Reducers.
-     */
-    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology 
top, Collection<HadoopInputSplit> splits,
-        Mappers mappers, int reducerCnt) {
-        Map<UUID, Integer> res = new HashMap<>();
-
-        // Assign reducers to splits.
-        Map<HadoopInputSplit, Integer> splitToReducerCnt = 
assignReducersToSplits(splits, reducerCnt);
-
-        // Assign as much local reducers as possible.
-        int remaining = 0;
-
-        for (Map.Entry<HadoopInputSplit, Integer> entry : 
splitToReducerCnt.entrySet()) {
-            HadoopInputSplit split = entry.getKey();
-            int cnt = entry.getValue();
-
-            if (cnt > 0) {
-                int assigned = assignLocalReducers(split, cnt, top, mappers, 
res);
-
-                assert assigned <= cnt;
-
-                remaining += cnt - assigned;
-            }
-        }
-
-        // Assign the rest reducers.
-        if (remaining > 0)
-            assignRemoteReducers(remaining, top, mappers, res);
-
-        return res;
-    }
-
-    /**
-     * Assign local split reducers.
-     *
-     * @param split Split.
-     * @param cnt Reducer count.
-     * @param top Topology.
-     * @param mappers Mappers.
-     * @param resMap Reducers result map.
-     * @return Number of locally assigned reducers.
-     */
-    private int assignLocalReducers(HadoopInputSplit split, int cnt, 
HadoopMapReducePlanTopology top, Mappers mappers,
-        Map<UUID, Integer> resMap) {
-        // Dereference node.
-        UUID nodeId = mappers.splitToNode.get(split);
-
-        assert nodeId != null;
-
-        // Dereference group.
-        HadoopMapReducePlanGroup grp = top.groupForId(nodeId);
-
-        assert grp != null;
-
-        // Assign more reducers to the node until threshold is reached.
-        int res = 0;
-
-        while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) {
-            res++;
-
-            grp.weight(grp.weight() + locReducerWeight);
-        }
-
-        // Update result map.
-        if (res > 0) {
-            Integer reducerCnt = resMap.get(nodeId);
-
-            resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res);
-        }
-
-        return res;
-    }
-
-    /**
-     * Assign remote reducers. Assign to the least loaded first.
-     *
-     * @param cnt Count.
-     * @param top Topology.
-     * @param mappers Mappers.
-     * @param resMap Reducers result map.
-     */
-    private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology 
top, Mappers mappers,
-        Map<UUID, Integer> resMap) {
-
-        TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new 
GroupWeightComparator());
-
-        set.addAll(top.groups());
-
-        while (cnt-- > 0) {
-            // The least loaded machine.
-            HadoopMapReducePlanGroup grp = set.first();
-
-            // Look for nodes with assigned splits.
-            List<UUID> splitNodeIds = null;
-
-            for (int i = 0; i < grp.nodeCount(); i++) {
-                UUID nodeId = grp.nodeId(i);
-
-                if (mappers.nodeToSplits.containsKey(nodeId)) {
-                    if (splitNodeIds == null)
-                        splitNodeIds = new ArrayList<>(2);
-
-                    splitNodeIds.add(nodeId);
-                }
-            }
-
-            // Select best node.
-            UUID id;
-            int newWeight;
-
-            if (splitNodeIds != null) {
-                id = 
splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size()));
-
-                newWeight = grp.weight() + locReducerWeight;
-            }
-            else {
-                id = 
grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
-
-                newWeight = grp.weight() + rmtReducerWeight;
-            }
-
-            // Re-add entry with new weight.
-            boolean rmv = set.remove(grp);
-
-            assert rmv;
-
-            grp.weight(newWeight);
-
-            boolean add = set.add(grp);
-
-            assert add;
-
-            // Update result map.
-            Integer res = resMap.get(id);
-
-            resMap.put(id, res == null ? 1 : res + 1);
-        }
-    }
-
-    /**
-     * Comparator based on group's weight.
-     */
-    private static class GroupWeightComparator implements 
Comparator<HadoopMapReducePlanGroup> {
-        /** {@inheritDoc} */
-        @Override public int compare(HadoopMapReducePlanGroup first, 
HadoopMapReducePlanGroup second) {
-            int res = first.weight() - second.weight();
-
-            if (res < 0)
-                return -1;
-            else if (res > 0)
-                return 1;
-            else
-                return first.macs().compareTo(second.macs());
-        }
-    }
-
-    /**
-     * Distribute reducers between splits.
-     *
-     * @param splits Splits.
-     * @param reducerCnt Reducer count.
-     * @return Map from input split to reducer count.
-     */
-    private Map<HadoopInputSplit, Integer> 
assignReducersToSplits(Collection<HadoopInputSplit> splits,
-        int reducerCnt) {
-        Map<HadoopInputSplit, Integer> res = new 
IdentityHashMap<>(splits.size());
-
-        int base = reducerCnt / splits.size();
-        int remainder = reducerCnt % splits.size();
-
-        for (HadoopInputSplit split : splits) {
-            int val = base;
-
-            if (remainder > 0) {
-                val++;
-
-                remainder--;
-            }
-
-            res.put(split, val);
-        }
-
-        assert remainder == 0;
-
-        return res;
-    }
-
-    /**
-     * Calculate group priority.
-     *
-     * @param grp Group.
-     * @param affIds Affinity IDs.
-     * @param prioAffId Priority affinity ID.
-     * @return Group priority.
-     */
-    private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, 
@Nullable Collection<UUID> affIds,
-        @Nullable UUID prioAffId) {
-        assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == 
F.first(affIds);
-        assert grp != null;
-
-        MapperPriority prio = MapperPriority.NORMAL;
-
-        if (!F.isEmpty(affIds)) {
-            for (int i = 0; i < grp.nodeCount(); i++) {
-                UUID id = grp.nodeId(i);
-
-                if (affIds.contains(id)) {
-                    prio = MapperPriority.HIGH;
-
-                    if (F.eq(prioAffId, id)) {
-                        prio = MapperPriority.HIGHEST;
-
-                        break;
-                    }
-                }
-            }
-        }
-
-        return prio;
-    }
-
-    /**
-     * Get local mapper weight. This weight is added to a node when a mapper 
is assigned and it's input split data is
-     * located on this node (at least partially).
-     * <p>
-     * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}.
-     *
-     * @return Remote mapper weight.
-     */
-    public int getLocalMapperWeight() {
-        return locMapperWeight;
-    }
-
-    /**
-     * Set local mapper weight. See {@link #getLocalMapperWeight()} for more 
information.
-     *
-     * @param locMapperWeight Local mapper weight.
-     */
-    public void setLocalMapperWeight(int locMapperWeight) {
-        this.locMapperWeight = locMapperWeight;
-    }
-
-    /**
-     * Get remote mapper weight. This weight is added to a node when a mapper 
is assigned, but it's input
-     * split data is not located on this node.
-     * <p>
-     * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}.
-     *
-     * @return Remote mapper weight.
-     */
-    public int getRemoteMapperWeight() {
-        return rmtMapperWeight;
-    }
-
-    /**
-     * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more 
information.
-     *
-     * @param rmtMapperWeight Remote mapper weight.
-     */
-    public void setRemoteMapperWeight(int rmtMapperWeight) {
-        this.rmtMapperWeight = rmtMapperWeight;
-    }
-
-    /**
-     * Get local reducer weight. This weight is added to a node when a reducer 
is assigned and the node have at least
-     * one assigned mapper.
-     * <p>
-     * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}.
-     *
-     * @return Local reducer weight.
-     */
-    public int getLocalReducerWeight() {
-        return locReducerWeight;
-    }
-
-    /**
-     * Set local reducer weight. See {@link #getLocalReducerWeight()} for more 
information.
-     *
-     * @param locReducerWeight Local reducer weight.
-     */
-    public void setLocalReducerWeight(int locReducerWeight) {
-        this.locReducerWeight = locReducerWeight;
-    }
-
-    /**
-     * Get remote reducer weight. This weight is added to a node when a 
reducer is assigned, but the node doesn't have
-     * any assigned mappers.
-     * <p>
-     * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}.
-     *
-     * @return Remote reducer weight.
-     */
-    public int getRemoteReducerWeight() {
-        return rmtReducerWeight;
-    }
-
-    /**
-     * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for 
more information.
-     *
-     * @param rmtReducerWeight Remote reducer weight.
-     */
-    public void setRemoteReducerWeight(int rmtReducerWeight) {
-        this.rmtReducerWeight = rmtReducerWeight;
-    }
-
-    /**
-     * Get reducer migration threshold weight. When threshold is reached, a 
node with mappers is no longer considered
-     * as preferred for further reducer assignments.
-     * <p>
-     * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}.
-     *
-     * @return Reducer migration threshold weight.
-     */
-    public int getPreferLocalReducerThresholdWeight() {
-        return preferLocReducerThresholdWeight;
-    }
-
-    /**
-     * Set reducer migration threshold weight. See {@link 
#getPreferLocalReducerThresholdWeight()} for more
-     * information.
-     *
-     * @param reducerMigrationThresholdWeight Reducer migration threshold 
weight.
-     */
-    public void setPreferLocalReducerThresholdWeight(int 
reducerMigrationThresholdWeight) {
-        this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this);
-    }
-
-    /**
-     * Node ID and length.
-     */
-    private static class NodeIdAndLength implements 
Comparable<NodeIdAndLength> {
-        /** Node ID. */
-        private final UUID id;
-
-        /** Length. */
-        private final long len;
-
-        /**
-         * Constructor.
-         *
-         * @param id Node ID.
-         * @param len Length.
-         */
-        public NodeIdAndLength(UUID id, long len) {
-            this.id = id;
-            this.len = len;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("NullableProblems")
-        @Override public int compareTo(NodeIdAndLength obj) {
-            long res = len - obj.len;
-
-            if (res > 0)
-                return -1;
-            else if (res < 0)
-                return 1;
-            else
-                return id.compareTo(obj.id);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return id.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof NodeIdAndLength && F.eq(id, 
((NodeIdAndLength)obj).id);
-        }
-    }
-
-    /**
-     * Mappers.
-     */
-    private static class Mappers {
-        /** Node-to-splits map. */
-        private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = 
new HashMap<>();
-
-        /** Split-to-node map. */
-        private final Map<HadoopInputSplit, UUID> splitToNode = new 
IdentityHashMap<>();
-
-        /**
-         * Add mapping.
-         *
-         * @param split Split.
-         * @param node Node.
-         */
-        public void add(HadoopInputSplit split, UUID node) {
-            Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node);
-
-            if (nodeSplits == null) {
-                nodeSplits = new HashSet<>();
-
-                nodeToSplits.put(node, nodeSplits);
-            }
-
-            nodeSplits.add(split);
-
-            splitToNode.put(split, node);
-        }
-    }
-
-    /**
-     * Mapper priority enumeration.
-     */
-    private enum MapperPriority {
-        /** Normal node. */
-        NORMAL(0),
-
-        /** (likely) Affinity node. */
-        HIGH(1),
-
-        /** (likely) Affinity node with the highest priority (e.g. because it 
hosts more data than other nodes). */
-        HIGHEST(2);
-
-        /** Value. */
-        private final int val;
-
-        /**
-         * Constructor.
-         *
-         * @param val Value.
-         */
-        MapperPriority(int val) {
-            this.val = val;
-        }
-
-        /**
-         * @return Value.
-         */
-        public int value() {
-            return val;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 65d9810..83ccdf0 100644
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -337,30 +337,7 @@ public class HadoopUtils {
         }
     }
 
-    /**
-     * Sort input splits by length.
-     *
-     * @param splits Splits.
-     * @return Sorted splits.
-     */
-    public static List<HadoopInputSplit> 
sortInputSplits(Collection<HadoopInputSplit> splits) {
-        int id = 0;
-
-        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
-
-        for (HadoopInputSplit split : splits) {
-            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0;
-
-            sortedSplits.add(new SplitSortWrapper(id++, split, len));
-        }
-
-        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
-
-        for (SplitSortWrapper sortedSplit : sortedSplits)
-            res.add(sortedSplit.split);
 
-        return res;
-    }
 
     /**
      * Set context class loader.
@@ -388,56 +365,4 @@ public class HadoopUtils {
         if (newLdr != oldLdr)
             Thread.currentThread().setContextClassLoader(oldLdr);
     }
-
-    /**
-     * Split wrapper for sorting.
-     */
-    private static class SplitSortWrapper implements 
Comparable<SplitSortWrapper> {
-        /** Unique ID. */
-        private final int id;
-
-        /** Split. */
-        private final HadoopInputSplit split;
-
-        /** Split length. */
-        private final long len;
-
-        /**
-         * Constructor.
-         *
-         * @param id Unique ID.
-         * @param split Split.
-         * @param len Split length.
-         */
-        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
-            this.id = id;
-            this.split = split;
-            this.len = len;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("NullableProblems")
-        @Override public int compareTo(SplitSortWrapper other) {
-            assert other != null;
-
-            long res = len - other.len;
-
-            if (res > 0)
-                return -1;
-            else if (res < 0)
-                return 1;
-            else
-                return id - other.id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof SplitSortWrapper && id == 
((SplitSortWrapper)obj).id;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
deleted file mode 100644
index a44e1ae..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * IGFS endpoint abstraction.
- */
-public class HadoopIgfsEndpoint {
-    /** Localhost. */
-    public static final String LOCALHOST = "127.0.0.1";
-
-    /** IGFS name. */
-    private final String igfsName;
-
-    /** Grid name. */
-    private final String gridName;
-
-    /** Host. */
-    private final String host;
-
-    /** Port. */
-    private final int port;
-
-    /**
-     * Normalize IGFS URI.
-     *
-     * @param uri URI.
-     * @return Normalized URI.
-     * @throws IOException If failed.
-     */
-    public static URI normalize(URI uri) throws IOException {
-        try {
-            if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme()))
-                throw new IOException("Failed to normalize UIR because it has 
non IGFS scheme: " + uri);
-
-            HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(uri.getAuthority());
-
-            StringBuilder sb = new StringBuilder();
-
-            if (endpoint.igfs() != null)
-                sb.append(endpoint.igfs());
-
-            if (endpoint.grid() != null)
-                sb.append(":").append(endpoint.grid());
-
-            return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : 
null, endpoint.host(), endpoint.port(),
-                uri.getPath(), uri.getQuery(), uri.getFragment());
-        }
-        catch (URISyntaxException | IgniteCheckedException e) {
-            throw new IOException("Failed to normalize URI: " + uri, e);
-        }
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param connStr Connection string.
-     * @throws IgniteCheckedException If failed to parse connection string.
-     */
-    public HadoopIgfsEndpoint(@Nullable String connStr) throws 
IgniteCheckedException {
-        if (connStr == null)
-            connStr = "";
-
-        String[] tokens = connStr.split("@", -1);
-
-        IgniteBiTuple<String, Integer> hostPort;
-
-        if (tokens.length == 1) {
-            igfsName = null;
-            gridName = null;
-
-            hostPort = hostPort(connStr, connStr);
-        }
-        else if (tokens.length == 2) {
-            String authStr = tokens[0];
-
-            if (authStr.isEmpty()) {
-                gridName = null;
-                igfsName = null;
-            }
-            else {
-                String[] authTokens = authStr.split(":", -1);
-
-                igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
-
-                if (authTokens.length == 1)
-                    gridName = null;
-                else if (authTokens.length == 2)
-                    gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
-                else
-                    throw new IgniteCheckedException("Invalid connection 
string format: " + connStr);
-            }
-
-            hostPort = hostPort(connStr, tokens[1]);
-        }
-        else
-            throw new IgniteCheckedException("Invalid connection string 
format: " + connStr);
-
-        host = hostPort.get1();
-
-        assert hostPort.get2() != null;
-
-        port = hostPort.get2();
-    }
-
-    /**
-     * Parse host and port.
-     *
-     * @param connStr Full connection string.
-     * @param hostPortStr Host/port connection string part.
-     * @return Tuple with host and port.
-     * @throws IgniteCheckedException If failed to parse connection string.
-     */
-    private IgniteBiTuple<String, Integer> hostPort(String connStr, String 
hostPortStr) throws IgniteCheckedException {
-        String[] tokens = hostPortStr.split(":", -1);
-
-        String host = tokens[0];
-
-        if (F.isEmpty(host))
-            host = LOCALHOST;
-
-        int port;
-
-        if (tokens.length == 1)
-            port = IgfsIpcEndpointConfiguration.DFLT_PORT;
-        else if (tokens.length == 2) {
-            String portStr = tokens[1];
-
-            try {
-                port = Integer.valueOf(portStr);
-
-                if (port < 0 || port > 65535)
-                    throw new IgniteCheckedException("Invalid port number: " + 
connStr);
-            }
-            catch (NumberFormatException e) {
-                throw new IgniteCheckedException("Invalid port number: " + 
connStr);
-            }
-        }
-        else
-            throw new IgniteCheckedException("Invalid connection string 
format: " + connStr);
-
-        return F.t(host, port);
-    }
-
-    /**
-     * @return IGFS name.
-     */
-    @Nullable public String igfs() {
-        return igfsName;
-    }
-
-    /**
-     * @return Grid name.
-     */
-    @Nullable public String grid() {
-        return gridName;
-    }
-
-    /**
-     * @return Host.
-     */
-    public String host() {
-        return host;
-    }
-
-    /**
-     * @return Host.
-     */
-    public boolean isLocal() {
-        return F.eq(LOCALHOST, host);
-    }
-
-    /**
-     * @return Port.
-     */
-    public int port() {
-        return port;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopIgfsEndpoint.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
deleted file mode 100644
index f01f72b..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
-
-/**
- * Base class for map-reduce planners.
- */
-public abstract class HadoopAbstractMapReducePlanner implements 
HadoopMapReducePlanner {
-    /** Injected grid. */
-    @IgniteInstanceResource
-    protected Ignite ignite;
-
-    /** Logger. */
-    @SuppressWarnings("UnusedDeclaration")
-    @LoggerResource
-    protected IgniteLogger log;
-
-    /**
-     * Create plan topology.
-     *
-     * @param nodes Topology nodes.
-     * @return Plan topology.
-     */
-    protected static HadoopMapReducePlanTopology 
topology(Collection<ClusterNode> nodes) {
-        Map<String, HadoopMapReducePlanGroup> macsMap = new 
HashMap<>(nodes.size());
-
-        Map<UUID, HadoopMapReducePlanGroup> idToGrp = new 
HashMap<>(nodes.size());
-        Map<String, HadoopMapReducePlanGroup> hostToGrp = new 
HashMap<>(nodes.size());
-
-        for (ClusterNode node : nodes) {
-            String macs = node.attribute(ATTR_MACS);
-
-            HadoopMapReducePlanGroup grp = macsMap.get(macs);
-
-            if (grp == null) {
-                grp = new HadoopMapReducePlanGroup(node, macs);
-
-                macsMap.put(macs, grp);
-            }
-            else
-                grp.add(node);
-
-            idToGrp.put(node.id(), grp);
-
-            for (String host : node.addresses()) {
-                HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
-
-                if (hostGrp == null)
-                    hostToGrp.put(host, grp);
-                else
-                    assert hostGrp == grp;
-            }
-        }
-
-        return new HadoopMapReducePlanTopology(new 
ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
-    }
-
-
-    /**
-     * Groups nodes by host names.
-     *
-     * @param top Topology to group.
-     * @return Map.
-     */
-    protected static Map<String, Collection<UUID>> 
groupByHost(Collection<ClusterNode> top) {
-        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
-        for (ClusterNode node : top) {
-            for (String host : node.hostNames()) {
-                Collection<UUID> nodeIds = grouped.get(host);
-
-                if (nodeIds == null) {
-                    // Expecting 1-2 nodes per host.
-                    nodeIds = new ArrayList<>(2);
-
-                    grouped.put(host, nodeIds);
-                }
-
-                nodeIds.add(node.id());
-            }
-        }
-
-        return grouped;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
deleted file mode 100644
index 15c62c8..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Map-reduce plan.
- */
-public class HadoopDefaultMapReducePlan implements HadoopMapReducePlan {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Mappers map. */
-    private Map<UUID, Collection<HadoopInputSplit>> mappers;
-
-    /** Reducers map. */
-    private Map<UUID, int[]> reducers;
-
-    /** Mappers count. */
-    private int mappersCnt;
-
-    /** Reducers count. */
-    private int reducersCnt;
-
-    /**
-     * @param mappers Mappers map.
-     * @param reducers Reducers map.
-     */
-    public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> 
mappers,
-        Map<UUID, int[]> reducers) {
-        this.mappers = mappers;
-        this.reducers = reducers;
-
-        if (mappers != null) {
-            for (Collection<HadoopInputSplit> splits : mappers.values())
-                mappersCnt += splits.size();
-        }
-
-        if (reducers != null) {
-            for (int[] rdcrs : reducers.values())
-                reducersCnt += rdcrs.length;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int mappers() {
-        return mappersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int reducers() {
-        return reducersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID nodeForReducer(int reducer) {
-        assert reducer >= 0 && reducer < reducersCnt : reducer;
-
-        for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
-            for (int r : entry.getValue()) {
-                if (r == reducer)
-                    return entry.getKey();
-            }
-        }
-
-        throw new IllegalStateException("Not found reducer index: " + reducer);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID 
nodeId) {
-        return mappers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public int[] reducers(UUID nodeId) {
-        return reducers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> mapperNodeIds() {
-        return mappers.keySet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> reducerNodeIds() {
-        return reducers.keySet();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
deleted file mode 100644
index 2fe8682..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.ArrayList;
-import java.util.UUID;
-
-/**
- * Map-reduce plan group of nodes on a single physical machine.
- */
-public class HadoopMapReducePlanGroup {
-    /** Node. */
-    private ClusterNode node;
-
-    /** Nodes. */
-    private ArrayList<ClusterNode> nodes;
-
-    /** MAC addresses. */
-    private final String macs;
-
-    /** Weight. */
-    private int weight;
-
-    /**
-     * Constructor.
-     *
-     * @param node First node in the group.
-     * @param macs MAC addresses.
-     */
-    public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
-        assert node != null;
-        assert macs != null;
-
-        this.node = node;
-        this.macs = macs;
-    }
-
-    /**
-     * Add node to the group.
-     *
-     * @param newNode New node.
-     */
-    public void add(ClusterNode newNode) {
-        if (node != null) {
-            nodes = new ArrayList<>(2);
-
-            nodes.add(node);
-
-            node = null;
-        }
-
-        nodes.add(newNode);
-    }
-
-    /**
-     * @return MAC addresses.
-     */
-    public String macs() {
-        return macs;
-    }
-
-    /**
-     * @return {@code True} if only sinle node present.
-     */
-    public boolean single() {
-        return nodeCount() == 1;
-    }
-
-    /**
-     * Get node ID by index.
-     *
-     * @param idx Index.
-     * @return Node.
-     */
-    public UUID nodeId(int idx) {
-        ClusterNode res;
-
-        if (node != null) {
-            assert idx == 0;
-
-            res = node;
-        }
-        else {
-            assert nodes != null;
-            assert idx < nodes.size();
-
-            res = nodes.get(idx);
-        }
-
-        assert res != null;
-
-        return res.id();
-    }
-
-    /**
-     * @return Node count.
-     */
-    public int nodeCount() {
-        return node != null ? 1 : nodes.size();
-    }
-
-    /**
-     * @return weight.
-     */
-    public int weight() {
-        return weight;
-    }
-
-    /**
-     * @param weight weight.
-     */
-    public void weight(int weight) {
-        this.weight = weight;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return macs.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, 
((HadoopMapReducePlanGroup)obj).macs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopMapReducePlanGroup.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
deleted file mode 100644
index fa5c469..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Map-reduce plan topology.
- */
-public class HadoopMapReducePlanTopology {
-    /** All groups. */
-    private final List<HadoopMapReducePlanGroup> grps;
-
-    /** Node ID to group map. */
-    private final Map<UUID, HadoopMapReducePlanGroup> idToGrp;
-
-    /** Host to group map. */
-    private final Map<String, HadoopMapReducePlanGroup> hostToGrp;
-
-    /**
-     * Constructor.
-     *
-     * @param grps All groups.
-     * @param idToGrp ID to group map.
-     * @param hostToGrp Host to group map.
-     */
-    public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps,
-        Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, 
HadoopMapReducePlanGroup> hostToGrp) {
-        assert grps != null;
-        assert idToGrp != null;
-        assert hostToGrp != null;
-
-        this.grps = grps;
-        this.idToGrp = idToGrp;
-        this.hostToGrp = hostToGrp;
-    }
-
-    /**
-     * @return All groups.
-     */
-    public List<HadoopMapReducePlanGroup> groups() {
-        return grps;
-    }
-
-    /**
-     * Get group for node ID.
-     *
-     * @param id Node ID.
-     * @return Group.
-     */
-    public HadoopMapReducePlanGroup groupForId(UUID id) {
-        return idToGrp.get(id);
-    }
-
-    /**
-     * Get group for host.
-     *
-     * @param host Host.
-     * @return Group.
-     */
-    @Nullable public HadoopMapReducePlanGroup groupForHost(String host) {
-        return hostToGrp.get(host);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopMapReducePlanTopology.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
new file mode 100644
index 0000000..e1101c5
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
+
+/**
+ * Default map-reduce planner implementation.
+ */
+public class IgniteHadoopMapReducePlanner extends 
HadoopAbstractMapReducePlanner {
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> top,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        // Convert collection of topology nodes to collection of topology node 
IDs.
+        Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
+
+        for (ClusterNode topNode : top)
+            topIds.add(topNode.id());
+
+        Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, 
job.input());
+
+        int rdcCnt = job.info().reducers();
+
+        if (rdcCnt < 0)
+            throw new IgniteCheckedException("Number of reducers must be 
non-negative, actual: " + rdcCnt);
+
+        Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt);
+
+        return new HadoopDefaultMapReducePlan(mappers, reducers);
+    }
+
+    /**
+     * Create plan for mappers.
+     *
+     * @param top Topology nodes.
+     * @param topIds Topology node IDs.
+     * @param splits Splits.
+     * @return Mappers map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Map<UUID, Collection<HadoopInputSplit>> 
mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
+        Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
+        Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+        Map<String, Collection<UUID>> nodes = groupByHost(top);
+
+        Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // 
Track node load.
+
+        for (UUID nodeId : topIds)
+            nodeLoads.put(nodeId, 0);
+
+        for (HadoopInputSplit split : splits) {
+            UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
+
+            if (log.isDebugEnabled())
+                log.debug("Mapped split to node [split=" + split + ", nodeId=" 
+ nodeId + ']');
+
+            Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId);
+
+            if (nodeSplits == null) {
+                nodeSplits = new ArrayList<>();
+
+                mappers.put(nodeId, nodeSplits);
+            }
+
+            nodeSplits.add(split);
+
+            // Updated node load.
+            nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
+        }
+
+        return mappers;
+    }
+
+    /**
+     * Determine the best node for this split.
+     *
+     * @param split Split.
+     * @param topIds Topology node IDs.
+     * @param nodes Nodes.
+     * @param nodeLoads Node load tracker.
+     * @return Node ID.
+     */
+    @SuppressWarnings("unchecked")
+    private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, 
Map<String, Collection<UUID>> nodes,
+        Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
+        if (split instanceof HadoopFileBlock) {
+            HadoopFileBlock split0 = (HadoopFileBlock)split;
+
+            if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+                HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(split0.file().getAuthority());
+
+                IgfsEx igfs = null;
+
+                if (F.eq(ignite.name(), endpoint.grid()))
+                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+                if (igfs != null && !igfs.isProxy(split0.file())) {
+                    IgfsPath path = new IgfsPath(split0.file());
+
+                    if (igfs.exists(path)) {
+                        Collection<IgfsBlockLocation> blocks;
+
+                        try {
+                            blocks = igfs.affinity(path, split0.start(), 
split0.length());
+                        }
+                        catch (IgniteException e) {
+                            throw new IgniteCheckedException(e);
+                        }
+
+                        assert blocks != null;
+
+                        if (blocks.size() == 1)
+                            // Fast-path, split consists of one IGFS block (as 
in most cases).
+                            return 
bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false);
+                        else {
+                            // Slow-path, file consists of multiple IGFS 
blocks. First, find the most co-located nodes.
+                            Map<UUID, Long> nodeMap = new HashMap<>();
+
+                            List<UUID> bestNodeIds = null;
+                            long bestLen = -1L;
+
+                            for (IgfsBlockLocation block : blocks) {
+                                for (UUID blockNodeId : block.nodeIds()) {
+                                    if (topIds.contains(blockNodeId)) {
+                                        Long oldLen = nodeMap.get(blockNodeId);
+                                        long newLen = oldLen == null ? 
block.length() : oldLen + block.length();
+
+                                        nodeMap.put(blockNodeId, newLen);
+
+                                        if (bestNodeIds == null || bestLen < 
newLen) {
+                                            bestNodeIds = new ArrayList<>(1);
+
+                                            bestNodeIds.add(blockNodeId);
+
+                                            bestLen = newLen;
+                                        }
+                                        else if (bestLen == newLen) {
+                                            assert !F.isEmpty(bestNodeIds);
+
+                                            bestNodeIds.add(blockNodeId);
+                                        }
+                                    }
+                                }
+                            }
+
+                            if (bestNodeIds != null) {
+                                return bestNodeIds.size() == 1 ? 
bestNodeIds.get(0) :
+                                    bestNode(bestNodeIds, topIds, nodeLoads, 
true);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        // Cannot use local IGFS for some reason, try selecting the node by 
host.
+        Collection<UUID> blockNodes = null;
+
+        for (String host : split.hosts()) {
+            Collection<UUID> hostNodes = nodes.get(host);
+
+            if (!F.isEmpty(hostNodes)) {
+                if (blockNodes == null)
+                    blockNodes = new ArrayList<>(hostNodes);
+                else
+                    blockNodes.addAll(hostNodes);
+            }
+        }
+
+        return bestNode(blockNodes, topIds, nodeLoads, false);
+    }
+
+    /**
+     * Finds the best (the least loaded) node among the candidates.
+     *
+     * @param candidates Candidates.
+     * @param topIds Topology node IDs.
+     * @param nodeLoads Known node loads.
+     * @param skipTopCheck Whether to skip topology check.
+     * @return The best node.
+     */
+    private UUID bestNode(@Nullable Collection<UUID> candidates, 
Collection<UUID> topIds, Map<UUID, Integer> nodeLoads,
+        boolean skipTopCheck) {
+        UUID bestNode = null;
+        int bestLoad = Integer.MAX_VALUE;
+
+        if (candidates != null) {
+            for (UUID candidate : candidates) {
+                if (skipTopCheck || topIds.contains(candidate)) {
+                    int load = nodeLoads.get(candidate);
+
+                    if (bestNode == null || bestLoad > load) {
+                        bestNode = candidate;
+                        bestLoad = load;
+
+                        if (bestLoad == 0)
+                            break; // Minimum load possible, no need for 
further iterations.
+                    }
+                }
+            }
+        }
+
+        if (bestNode == null) {
+            // Blocks are located on nodes which are not Hadoop-enabled, 
assign to the least loaded one.
+            bestLoad = Integer.MAX_VALUE;
+
+            for (UUID nodeId : topIds) {
+                int load = nodeLoads.get(nodeId);
+
+                if (bestNode == null || bestLoad > load) {
+                    bestNode = nodeId;
+                    bestLoad = load;
+
+                    if (bestLoad == 0)
+                        break; // Minimum load possible, no need for further 
iterations.
+                }
+            }
+        }
+
+        assert bestNode != null;
+
+        return bestNode;
+    }
+
+    /**
+     * Create plan for reducers.
+     *
+     * @param top Topology.
+     * @param mappers Mappers map.
+     * @param reducerCnt Reducers count.
+     * @return Reducers map.
+     */
+    private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
+        Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) {
+        // Determine initial node weights.
+        int totalWeight = 0;
+
+        List<WeightedNode> nodes = new ArrayList<>(top.size());
+
+        for (ClusterNode node : top) {
+            Collection<HadoopInputSplit> split = mappers.get(node.id());
+
+            int weight = reducerNodeWeight(node, split != null ? split.size() 
: 0);
+
+            nodes.add(new WeightedNode(node.id(), weight, weight));
+
+            totalWeight += weight;
+        }
+
+        // Adjust weights.
+        int totalAdjustedWeight = 0;
+
+        for (WeightedNode node : nodes) {
+            node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
+
+            node.weight = Math.round(node.floatWeight);
+
+            totalAdjustedWeight += node.weight;
+        }
+
+        // Apply redundant/lost reducers.
+        Collections.sort(nodes);
+
+        if (totalAdjustedWeight > reducerCnt) {
+            // Too much reducers set.
+            ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() 
- 1);
+
+            while (totalAdjustedWeight != reducerCnt) {
+                if (!iter.hasPrevious())
+                    iter = nodes.listIterator(nodes.size() - 1);
+
+                WeightedNode node = iter.previous();
+
+                if (node.weight > 0) {
+                    node.weight -= 1;
+
+                    totalAdjustedWeight--;
+                }
+            }
+        }
+        else if (totalAdjustedWeight < reducerCnt) {
+            // Not enough reducers set.
+            ListIterator<WeightedNode> iter = nodes.listIterator(0);
+
+            while (totalAdjustedWeight != reducerCnt) {
+                if (!iter.hasNext())
+                    iter = nodes.listIterator(0);
+
+                WeightedNode node = iter.next();
+
+                if (node.floatWeight > 0.0f) {
+                    node.weight += 1;
+
+                    totalAdjustedWeight++;
+                }
+            }
+        }
+
+        int idx = 0;
+
+        Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
+
+        for (WeightedNode node : nodes) {
+            if (node.weight > 0) {
+                int[] arr = new int[node.weight];
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = idx++;
+
+                reducers.put(node.nodeId, arr);
+            }
+        }
+
+        return reducers;
+    }
+
+    /**
+     * Calculate node weight based on node metrics and data co-location.
+     *
+     * @param node Node.
+     * @param splitCnt Splits mapped to this node.
+     * @return Node weight.
+     */
+    @SuppressWarnings("UnusedParameters")
+    protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
+        return splitCnt;
+    }
+
+    /**
+     * Weighted node.
+     */
+    private static class WeightedNode implements Comparable<WeightedNode> {
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Weight. */
+        private int weight;
+
+        /** Floating point weight. */
+        private float floatWeight;
+
+        /**
+         * Constructor.
+         *
+         * @param nodeId Node ID.
+         * @param weight Weight.
+         * @param floatWeight Floating point weight.
+         */
+        private WeightedNode(UUID nodeId, int weight, float floatWeight) {
+            this.nodeId = nodeId;
+            this.weight = weight;
+            this.floatWeight = floatWeight;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj != null && obj instanceof WeightedNode && F.eq(nodeId, 
((WeightedNode)obj).nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return nodeId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull WeightedNode other) {
+            float res = other.floatWeight - floatWeight;
+
+            return res > 0.0f ? 1 : res < 0.0f ? -1 : 
nodeId.compareTo(other.nodeId);
+        }
+    }
+}
\ No newline at end of file

Reply via email to