IGNITE-3916: Moved MR planner to common module.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b66b969 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b66b969 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b66b969 Branch: refs/heads/ignite-3916 Commit: 4b66b969cb447895bfee142701633d99885cdf12 Parents: 5515280 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Sep 16 16:09:34 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Sep 16 16:09:34 2016 +0300 ---------------------------------------------------------------------- .../mapreduce/IgniteHadoopMapReducePlanner.java | 416 --------- .../IgniteHadoopWeightedMapReducePlanner.java | 846 ------------------- .../internal/processors/hadoop/HadoopUtils.java | 75 -- .../hadoop/igfs/HadoopIgfsEndpoint.java | 210 ----- .../planner/HadoopAbstractMapReducePlanner.java | 116 --- .../planner/HadoopDefaultMapReducePlan.java | 109 --- .../planner/HadoopMapReducePlanGroup.java | 150 ---- .../planner/HadoopMapReducePlanTopology.java | 89 -- .../mapreduce/IgniteHadoopMapReducePlanner.java | 416 +++++++++ .../IgniteHadoopWeightedMapReducePlanner.java | 846 +++++++++++++++++++ .../processors/hadoop/HadoopCommonUtils.java | 110 +++ .../hadoop/igfs/HadoopIgfsEndpoint.java | 210 +++++ .../planner/HadoopAbstractMapReducePlanner.java | 116 +++ .../planner/HadoopDefaultMapReducePlan.java | 110 +++ .../planner/HadoopMapReducePlanGroup.java | 150 ++++ .../planner/HadoopMapReducePlanTopology.java | 89 ++ 16 files changed, 2047 insertions(+), 2011 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java deleted file mode 100644 index d4a44fa..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java +++ /dev/null @@ -1,416 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.mapreduce; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.UUID; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; - -/** - * Default map-reduce planner implementation. - */ -public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner { - /** {@inheritDoc} */ - @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { - // Convert collection of topology nodes to collection of topology node IDs. - Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); - - for (ClusterNode topNode : top) - topIds.add(topNode.id()); - - Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input()); - - int rdcCnt = job.info().reducers(); - - if (rdcCnt < 0) - throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); - - Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); - - return new HadoopDefaultMapReducePlan(mappers, reducers); - } - - /** - * Create plan for mappers. - * - * @param top Topology nodes. - * @param topIds Topology node IDs. - * @param splits Splits. - * @return Mappers map. - * @throws IgniteCheckedException If failed. - */ - private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, - Iterable<HadoopInputSplit> splits) throws IgniteCheckedException { - Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); - - Map<String, Collection<UUID>> nodes = groupByHost(top); - - Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. - - for (UUID nodeId : topIds) - nodeLoads.put(nodeId, 0); - - for (HadoopInputSplit split : splits) { - UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); - - if (log.isDebugEnabled()) - log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); - - Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId); - - if (nodeSplits == null) { - nodeSplits = new ArrayList<>(); - - mappers.put(nodeId, nodeSplits); - } - - nodeSplits.add(split); - - // Updated node load. - nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); - } - - return mappers; - } - - /** - * Determine the best node for this split. - * - * @param split Split. - * @param topIds Topology node IDs. - * @param nodes Nodes. - * @param nodeLoads Node load tracker. - * @return Node ID. - */ - @SuppressWarnings("unchecked") - private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, - Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { - if (split instanceof HadoopFileBlock) { - HadoopFileBlock split0 = (HadoopFileBlock)split; - - if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { - HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); - - IgfsEx igfs = null; - - if (F.eq(ignite.name(), endpoint.grid())) - igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); - - if (igfs != null && !igfs.isProxy(split0.file())) { - IgfsPath path = new IgfsPath(split0.file()); - - if (igfs.exists(path)) { - Collection<IgfsBlockLocation> blocks; - - try { - blocks = igfs.affinity(path, split0.start(), split0.length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - - assert blocks != null; - - if (blocks.size() == 1) - // Fast-path, split consists of one IGFS block (as in most cases). - return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); - else { - // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. - Map<UUID, Long> nodeMap = new HashMap<>(); - - List<UUID> bestNodeIds = null; - long bestLen = -1L; - - for (IgfsBlockLocation block : blocks) { - for (UUID blockNodeId : block.nodeIds()) { - if (topIds.contains(blockNodeId)) { - Long oldLen = nodeMap.get(blockNodeId); - long newLen = oldLen == null ? block.length() : oldLen + block.length(); - - nodeMap.put(blockNodeId, newLen); - - if (bestNodeIds == null || bestLen < newLen) { - bestNodeIds = new ArrayList<>(1); - - bestNodeIds.add(blockNodeId); - - bestLen = newLen; - } - else if (bestLen == newLen) { - assert !F.isEmpty(bestNodeIds); - - bestNodeIds.add(blockNodeId); - } - } - } - } - - if (bestNodeIds != null) { - return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : - bestNode(bestNodeIds, topIds, nodeLoads, true); - } - } - } - } - } - } - - // Cannot use local IGFS for some reason, try selecting the node by host. - Collection<UUID> blockNodes = null; - - for (String host : split.hosts()) { - Collection<UUID> hostNodes = nodes.get(host); - - if (!F.isEmpty(hostNodes)) { - if (blockNodes == null) - blockNodes = new ArrayList<>(hostNodes); - else - blockNodes.addAll(hostNodes); - } - } - - return bestNode(blockNodes, topIds, nodeLoads, false); - } - - /** - * Finds the best (the least loaded) node among the candidates. - * - * @param candidates Candidates. - * @param topIds Topology node IDs. - * @param nodeLoads Known node loads. - * @param skipTopCheck Whether to skip topology check. - * @return The best node. - */ - private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, - boolean skipTopCheck) { - UUID bestNode = null; - int bestLoad = Integer.MAX_VALUE; - - if (candidates != null) { - for (UUID candidate : candidates) { - if (skipTopCheck || topIds.contains(candidate)) { - int load = nodeLoads.get(candidate); - - if (bestNode == null || bestLoad > load) { - bestNode = candidate; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - } - - if (bestNode == null) { - // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. - bestLoad = Integer.MAX_VALUE; - - for (UUID nodeId : topIds) { - int load = nodeLoads.get(nodeId); - - if (bestNode == null || bestLoad > load) { - bestNode = nodeId; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - - assert bestNode != null; - - return bestNode; - } - - /** - * Create plan for reducers. - * - * @param top Topology. - * @param mappers Mappers map. - * @param reducerCnt Reducers count. - * @return Reducers map. - */ - private Map<UUID, int[]> reducers(Collection<ClusterNode> top, - Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) { - // Determine initial node weights. - int totalWeight = 0; - - List<WeightedNode> nodes = new ArrayList<>(top.size()); - - for (ClusterNode node : top) { - Collection<HadoopInputSplit> split = mappers.get(node.id()); - - int weight = reducerNodeWeight(node, split != null ? split.size() : 0); - - nodes.add(new WeightedNode(node.id(), weight, weight)); - - totalWeight += weight; - } - - // Adjust weights. - int totalAdjustedWeight = 0; - - for (WeightedNode node : nodes) { - node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; - - node.weight = Math.round(node.floatWeight); - - totalAdjustedWeight += node.weight; - } - - // Apply redundant/lost reducers. - Collections.sort(nodes); - - if (totalAdjustedWeight > reducerCnt) { - // Too much reducers set. - ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasPrevious()) - iter = nodes.listIterator(nodes.size() - 1); - - WeightedNode node = iter.previous(); - - if (node.weight > 0) { - node.weight -= 1; - - totalAdjustedWeight--; - } - } - } - else if (totalAdjustedWeight < reducerCnt) { - // Not enough reducers set. - ListIterator<WeightedNode> iter = nodes.listIterator(0); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasNext()) - iter = nodes.listIterator(0); - - WeightedNode node = iter.next(); - - if (node.floatWeight > 0.0f) { - node.weight += 1; - - totalAdjustedWeight++; - } - } - } - - int idx = 0; - - Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f); - - for (WeightedNode node : nodes) { - if (node.weight > 0) { - int[] arr = new int[node.weight]; - - for (int i = 0; i < arr.length; i++) - arr[i] = idx++; - - reducers.put(node.nodeId, arr); - } - } - - return reducers; - } - - /** - * Calculate node weight based on node metrics and data co-location. - * - * @param node Node. - * @param splitCnt Splits mapped to this node. - * @return Node weight. - */ - @SuppressWarnings("UnusedParameters") - protected int reducerNodeWeight(ClusterNode node, int splitCnt) { - return splitCnt; - } - - /** - * Weighted node. - */ - private static class WeightedNode implements Comparable<WeightedNode> { - /** Node ID. */ - private final UUID nodeId; - - /** Weight. */ - private int weight; - - /** Floating point weight. */ - private float floatWeight; - - /** - * Constructor. - * - * @param nodeId Node ID. - * @param weight Weight. - * @param floatWeight Floating point weight. - */ - private WeightedNode(UUID nodeId, int weight, float floatWeight) { - this.nodeId = nodeId; - this.weight = weight; - this.floatWeight = floatWeight; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return nodeId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull WeightedNode other) { - float res = other.floatWeight - floatWeight; - - return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java deleted file mode 100644 index 27ffc19..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ /dev/null @@ -1,846 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.mapreduce; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Map-reduce planner which assigns mappers and reducers based on their "weights". Weight describes how much resources - * are required to execute particular map or reduce task. - * <p> - * Plan creation consists of two steps: assigning mappers and assigning reducers. - * <p> - * Mappers are assigned based on input split data location. For each input split we search for nodes where - * its data is stored. Planner tries to assign mappers to their affinity nodes first. This process is governed by two - * properties: - * <ul> - * <li><b>{@code localMapperWeight}</b> - weight of a map task when it is executed on an affinity node;</li> - * <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is executed on a non-affinity node.</li> - * </ul> - * Planning algorithm assign mappers so that total resulting weight on all nodes is minimum possible. - * <p> - * Reducers are assigned differently. First we try to distribute reducers across nodes with mappers. This approach - * could minimize expensive data transfer over network. Reducer assigned to a node with mapper is considered - * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. This process continue until certain weight - * threshold is reached what means that current node is already too busy and it should not have higher priority over - * other nodes any more. Threshold can be configured using <b>{@code preferLocalReducerThresholdWeight}</b> property. - * <p> - * When local reducer threshold is reached on all nodes, we distribute remaining reducers based on their local and - * remote weights in the same way as it is done for mappers. This process is governed by two - * properties: - * <ul> - * <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it is executed on a node with mappers;</li> - * <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it is executed on a node without mappers.</li> - * </ul> - */ -public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReducePlanner { - /** Default local mapper weight. */ - public static final int DFLT_LOC_MAPPER_WEIGHT = 100; - - /** Default remote mapper weight. */ - public static final int DFLT_RMT_MAPPER_WEIGHT = 100; - - /** Default local reducer weight. */ - public static final int DFLT_LOC_REDUCER_WEIGHT = 100; - - /** Default remote reducer weight. */ - public static final int DFLT_RMT_REDUCER_WEIGHT = 100; - - /** Default reducer migration threshold weight. */ - public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200; - - /** Local mapper weight. */ - private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT; - - /** Remote mapper weight. */ - private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT; - - /** Local reducer weight. */ - private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT; - - /** Remote reducer weight. */ - private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT; - - /** Reducer migration threshold weight. */ - private int preferLocReducerThresholdWeight = DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT; - - /** {@inheritDoc} */ - @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes, - @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { - List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input()); - int reducerCnt = job.info().reducers(); - - if (reducerCnt < 0) - throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt); - - HadoopMapReducePlanTopology top = topology(nodes); - - Mappers mappers = assignMappers(splits, top); - - Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, reducerCnt); - - return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers); - } - - /** - * Assign mappers to nodes. - * - * @param splits Input splits. - * @param top Topology. - * @return Mappers. - * @throws IgniteCheckedException If failed. - */ - private Mappers assignMappers(Collection<HadoopInputSplit> splits, - HadoopMapReducePlanTopology top) throws IgniteCheckedException { - Mappers res = new Mappers(); - - for (HadoopInputSplit split : splits) { - // Try getting IGFS affinity. - Collection<UUID> nodeIds = affinityNodesForSplit(split, top); - - // Get best node. - UUID node = bestMapperNode(nodeIds, top); - - assert node != null; - - res.add(split, node); - } - - return res; - } - - /** - * Get affinity nodes for the given input split. - * <p> - * Order in the returned collection *is* significant, meaning that nodes containing more data - * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. - * - * @param split Split. - * @param top Topology. - * @return Affintiy nodes. - * @throws IgniteCheckedException If failed. - */ - private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top) - throws IgniteCheckedException { - Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split); - - if (igfsNodeIds != null) - return igfsNodeIds; - - Map<NodeIdAndLength, UUID> res = new TreeMap<>(); - - for (String host : split.hosts()) { - long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0L; - - HadoopMapReducePlanGroup grp = top.groupForHost(host); - - if (grp != null) { - for (int i = 0; i < grp.nodeCount(); i++) { - UUID nodeId = grp.nodeId(i); - - res.put(new NodeIdAndLength(nodeId, len), nodeId); - } - } - } - - return new LinkedHashSet<>(res.values()); - } - - /** - * Get IGFS affinity nodes for split if possible. - * <p> - * Order in the returned collection *is* significant, meaning that nodes containing more data - * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. - * - * @param split Input split. - * @return IGFS affinity or {@code null} if IGFS is not available. - * @throws IgniteCheckedException If failed. - */ - @Nullable private Collection<UUID> igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException { - if (split instanceof HadoopFileBlock) { - HadoopFileBlock split0 = (HadoopFileBlock)split; - - if (IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { - HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); - - IgfsEx igfs = null; - - if (F.eq(ignite.name(), endpoint.grid())) - igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); - - if (igfs != null && !igfs.isProxy(split0.file())) { - IgfsPath path = new IgfsPath(split0.file()); - - if (igfs.exists(path)) { - Collection<IgfsBlockLocation> blocks; - - try { - blocks = igfs.affinity(path, split0.start(), split0.length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException("Failed to get IGFS file block affinity [path=" + path + - ", start=" + split0.start() + ", len=" + split0.length() + ']', e); - } - - assert blocks != null; - - if (blocks.size() == 1) - return blocks.iterator().next().nodeIds(); - else { - // The most "local" nodes go first. - Map<UUID, Long> idToLen = new HashMap<>(); - - for (IgfsBlockLocation block : blocks) { - for (UUID id : block.nodeIds()) { - Long len = idToLen.get(id); - - idToLen.put(id, len == null ? block.length() : block.length() + len); - } - } - - // Sort the nodes in non-ascending order by contained data lengths. - Map<NodeIdAndLength, UUID> res = new TreeMap<>(); - - for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet()) { - UUID id = idToLenEntry.getKey(); - - res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id); - } - - return new LinkedHashSet<>(res.values()); - } - } - } - } - } - - return null; - } - - /** - * Find best mapper node. - * - * @param affIds Affinity node IDs. - * @param top Topology. - * @return Result. - */ - private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) { - // Priority node. - UUID prioAffId = F.first(affIds); - - // Find group with the least weight. - HadoopMapReducePlanGroup resGrp = null; - MapperPriority resPrio = MapperPriority.NORMAL; - int resWeight = Integer.MAX_VALUE; - - for (HadoopMapReducePlanGroup grp : top.groups()) { - MapperPriority prio = groupPriority(grp, affIds, prioAffId); - - int weight = grp.weight() + (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight); - - if (resGrp == null || weight < resWeight || weight == resWeight && prio.value() > resPrio.value()) { - resGrp = grp; - resPrio = prio; - resWeight = weight; - } - } - - assert resGrp != null; - - // Update group weight for further runs. - resGrp.weight(resWeight); - - // Return the best node from the group. - return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId); - } - - /** - * Get best node in the group. - * - * @param grp Group. - * @param priority Priority. - * @param affIds Affinity IDs. - * @param prioAffId Priority affinity IDs. - * @return Best node ID in the group. - */ - private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, MapperPriority priority, - @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) { - // Return the best node from the group. - int idx = 0; - - // This is rare situation when several nodes are started on the same host. - if (!grp.single()) { - switch (priority) { - case NORMAL: { - // Pick any node. - idx = ThreadLocalRandom.current().nextInt(grp.nodeCount()); - - break; - } - case HIGH: { - // Pick any affinity node. - assert affIds != null; - - List<Integer> cands = new ArrayList<>(); - - for (int i = 0; i < grp.nodeCount(); i++) { - UUID id = grp.nodeId(i); - - if (affIds.contains(id)) - cands.add(i); - } - - idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size())); - - break; - } - default: { - // Find primary node. - assert prioAffId != null; - - for (int i = 0; i < grp.nodeCount(); i++) { - UUID id = grp.nodeId(i); - - if (F.eq(id, prioAffId)) { - idx = i; - - break; - } - } - - assert priority == MapperPriority.HIGHEST; - } - } - } - - return grp.nodeId(idx); - } - - /** - * Generate reducers. - * - * @param splits Input splits. - * @param top Topology. - * @param mappers Mappers. - * @param reducerCnt Reducer count. - * @return Reducers. - */ - private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top, - Mappers mappers, int reducerCnt) { - Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt); - - int cnt = 0; - - Map<UUID, int[]> res = new HashMap<>(reducers.size()); - - for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) { - int[] arr = new int[reducerEntry.getValue()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = cnt++; - - res.put(reducerEntry.getKey(), arr); - } - - assert reducerCnt == cnt : reducerCnt + " != " + cnt; - - return res; - } - - /** - * Generate reducers. - * - * @param top Topology. - * @param splits Input splits. - * @param mappers Mappers. - * @param reducerCnt Reducer count. - * @return Reducers. - */ - private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit> splits, - Mappers mappers, int reducerCnt) { - Map<UUID, Integer> res = new HashMap<>(); - - // Assign reducers to splits. - Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits, reducerCnt); - - // Assign as much local reducers as possible. - int remaining = 0; - - for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet()) { - HadoopInputSplit split = entry.getKey(); - int cnt = entry.getValue(); - - if (cnt > 0) { - int assigned = assignLocalReducers(split, cnt, top, mappers, res); - - assert assigned <= cnt; - - remaining += cnt - assigned; - } - } - - // Assign the rest reducers. - if (remaining > 0) - assignRemoteReducers(remaining, top, mappers, res); - - return res; - } - - /** - * Assign local split reducers. - * - * @param split Split. - * @param cnt Reducer count. - * @param top Topology. - * @param mappers Mappers. - * @param resMap Reducers result map. - * @return Number of locally assigned reducers. - */ - private int assignLocalReducers(HadoopInputSplit split, int cnt, HadoopMapReducePlanTopology top, Mappers mappers, - Map<UUID, Integer> resMap) { - // Dereference node. - UUID nodeId = mappers.splitToNode.get(split); - - assert nodeId != null; - - // Dereference group. - HadoopMapReducePlanGroup grp = top.groupForId(nodeId); - - assert grp != null; - - // Assign more reducers to the node until threshold is reached. - int res = 0; - - while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) { - res++; - - grp.weight(grp.weight() + locReducerWeight); - } - - // Update result map. - if (res > 0) { - Integer reducerCnt = resMap.get(nodeId); - - resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res); - } - - return res; - } - - /** - * Assign remote reducers. Assign to the least loaded first. - * - * @param cnt Count. - * @param top Topology. - * @param mappers Mappers. - * @param resMap Reducers result map. - */ - private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology top, Mappers mappers, - Map<UUID, Integer> resMap) { - - TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new GroupWeightComparator()); - - set.addAll(top.groups()); - - while (cnt-- > 0) { - // The least loaded machine. - HadoopMapReducePlanGroup grp = set.first(); - - // Look for nodes with assigned splits. - List<UUID> splitNodeIds = null; - - for (int i = 0; i < grp.nodeCount(); i++) { - UUID nodeId = grp.nodeId(i); - - if (mappers.nodeToSplits.containsKey(nodeId)) { - if (splitNodeIds == null) - splitNodeIds = new ArrayList<>(2); - - splitNodeIds.add(nodeId); - } - } - - // Select best node. - UUID id; - int newWeight; - - if (splitNodeIds != null) { - id = splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size())); - - newWeight = grp.weight() + locReducerWeight; - } - else { - id = grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount())); - - newWeight = grp.weight() + rmtReducerWeight; - } - - // Re-add entry with new weight. - boolean rmv = set.remove(grp); - - assert rmv; - - grp.weight(newWeight); - - boolean add = set.add(grp); - - assert add; - - // Update result map. - Integer res = resMap.get(id); - - resMap.put(id, res == null ? 1 : res + 1); - } - } - - /** - * Comparator based on group's weight. - */ - private static class GroupWeightComparator implements Comparator<HadoopMapReducePlanGroup> { - /** {@inheritDoc} */ - @Override public int compare(HadoopMapReducePlanGroup first, HadoopMapReducePlanGroup second) { - int res = first.weight() - second.weight(); - - if (res < 0) - return -1; - else if (res > 0) - return 1; - else - return first.macs().compareTo(second.macs()); - } - } - - /** - * Distribute reducers between splits. - * - * @param splits Splits. - * @param reducerCnt Reducer count. - * @return Map from input split to reducer count. - */ - private Map<HadoopInputSplit, Integer> assignReducersToSplits(Collection<HadoopInputSplit> splits, - int reducerCnt) { - Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size()); - - int base = reducerCnt / splits.size(); - int remainder = reducerCnt % splits.size(); - - for (HadoopInputSplit split : splits) { - int val = base; - - if (remainder > 0) { - val++; - - remainder--; - } - - res.put(split, val); - } - - assert remainder == 0; - - return res; - } - - /** - * Calculate group priority. - * - * @param grp Group. - * @param affIds Affinity IDs. - * @param prioAffId Priority affinity ID. - * @return Group priority. - */ - private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID> affIds, - @Nullable UUID prioAffId) { - assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == F.first(affIds); - assert grp != null; - - MapperPriority prio = MapperPriority.NORMAL; - - if (!F.isEmpty(affIds)) { - for (int i = 0; i < grp.nodeCount(); i++) { - UUID id = grp.nodeId(i); - - if (affIds.contains(id)) { - prio = MapperPriority.HIGH; - - if (F.eq(prioAffId, id)) { - prio = MapperPriority.HIGHEST; - - break; - } - } - } - } - - return prio; - } - - /** - * Get local mapper weight. This weight is added to a node when a mapper is assigned and it's input split data is - * located on this node (at least partially). - * <p> - * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}. - * - * @return Remote mapper weight. - */ - public int getLocalMapperWeight() { - return locMapperWeight; - } - - /** - * Set local mapper weight. See {@link #getLocalMapperWeight()} for more information. - * - * @param locMapperWeight Local mapper weight. - */ - public void setLocalMapperWeight(int locMapperWeight) { - this.locMapperWeight = locMapperWeight; - } - - /** - * Get remote mapper weight. This weight is added to a node when a mapper is assigned, but it's input - * split data is not located on this node. - * <p> - * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}. - * - * @return Remote mapper weight. - */ - public int getRemoteMapperWeight() { - return rmtMapperWeight; - } - - /** - * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more information. - * - * @param rmtMapperWeight Remote mapper weight. - */ - public void setRemoteMapperWeight(int rmtMapperWeight) { - this.rmtMapperWeight = rmtMapperWeight; - } - - /** - * Get local reducer weight. This weight is added to a node when a reducer is assigned and the node have at least - * one assigned mapper. - * <p> - * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}. - * - * @return Local reducer weight. - */ - public int getLocalReducerWeight() { - return locReducerWeight; - } - - /** - * Set local reducer weight. See {@link #getLocalReducerWeight()} for more information. - * - * @param locReducerWeight Local reducer weight. - */ - public void setLocalReducerWeight(int locReducerWeight) { - this.locReducerWeight = locReducerWeight; - } - - /** - * Get remote reducer weight. This weight is added to a node when a reducer is assigned, but the node doesn't have - * any assigned mappers. - * <p> - * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}. - * - * @return Remote reducer weight. - */ - public int getRemoteReducerWeight() { - return rmtReducerWeight; - } - - /** - * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for more information. - * - * @param rmtReducerWeight Remote reducer weight. - */ - public void setRemoteReducerWeight(int rmtReducerWeight) { - this.rmtReducerWeight = rmtReducerWeight; - } - - /** - * Get reducer migration threshold weight. When threshold is reached, a node with mappers is no longer considered - * as preferred for further reducer assignments. - * <p> - * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}. - * - * @return Reducer migration threshold weight. - */ - public int getPreferLocalReducerThresholdWeight() { - return preferLocReducerThresholdWeight; - } - - /** - * Set reducer migration threshold weight. See {@link #getPreferLocalReducerThresholdWeight()} for more - * information. - * - * @param reducerMigrationThresholdWeight Reducer migration threshold weight. - */ - public void setPreferLocalReducerThresholdWeight(int reducerMigrationThresholdWeight) { - this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this); - } - - /** - * Node ID and length. - */ - private static class NodeIdAndLength implements Comparable<NodeIdAndLength> { - /** Node ID. */ - private final UUID id; - - /** Length. */ - private final long len; - - /** - * Constructor. - * - * @param id Node ID. - * @param len Length. - */ - public NodeIdAndLength(UUID id, long len) { - this.id = id; - this.len = len; - } - - /** {@inheritDoc} */ - @SuppressWarnings("NullableProblems") - @Override public int compareTo(NodeIdAndLength obj) { - long res = len - obj.len; - - if (res > 0) - return -1; - else if (res < 0) - return 1; - else - return id.compareTo(obj.id); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof NodeIdAndLength && F.eq(id, ((NodeIdAndLength)obj).id); - } - } - - /** - * Mappers. - */ - private static class Mappers { - /** Node-to-splits map. */ - private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = new HashMap<>(); - - /** Split-to-node map. */ - private final Map<HadoopInputSplit, UUID> splitToNode = new IdentityHashMap<>(); - - /** - * Add mapping. - * - * @param split Split. - * @param node Node. - */ - public void add(HadoopInputSplit split, UUID node) { - Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node); - - if (nodeSplits == null) { - nodeSplits = new HashSet<>(); - - nodeToSplits.put(node, nodeSplits); - } - - nodeSplits.add(split); - - splitToNode.put(split, node); - } - } - - /** - * Mapper priority enumeration. - */ - private enum MapperPriority { - /** Normal node. */ - NORMAL(0), - - /** (likely) Affinity node. */ - HIGH(1), - - /** (likely) Affinity node with the highest priority (e.g. because it hosts more data than other nodes). */ - HIGHEST(2); - - /** Value. */ - private final int val; - - /** - * Constructor. - * - * @param val Value. - */ - MapperPriority(int val) { - this.val = val; - } - - /** - * @return Value. - */ - public int value() { - return val; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 65d9810..83ccdf0 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -337,30 +337,7 @@ public class HadoopUtils { } } - /** - * Sort input splits by length. - * - * @param splits Splits. - * @return Sorted splits. - */ - public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) { - int id = 0; - - TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>(); - - for (HadoopInputSplit split : splits) { - long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0; - - sortedSplits.add(new SplitSortWrapper(id++, split, len)); - } - - ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size()); - - for (SplitSortWrapper sortedSplit : sortedSplits) - res.add(sortedSplit.split); - return res; - } /** * Set context class loader. @@ -388,56 +365,4 @@ public class HadoopUtils { if (newLdr != oldLdr) Thread.currentThread().setContextClassLoader(oldLdr); } - - /** - * Split wrapper for sorting. - */ - private static class SplitSortWrapper implements Comparable<SplitSortWrapper> { - /** Unique ID. */ - private final int id; - - /** Split. */ - private final HadoopInputSplit split; - - /** Split length. */ - private final long len; - - /** - * Constructor. - * - * @param id Unique ID. - * @param split Split. - * @param len Split length. - */ - public SplitSortWrapper(int id, HadoopInputSplit split, long len) { - this.id = id; - this.split = split; - this.len = len; - } - - /** {@inheritDoc} */ - @SuppressWarnings("NullableProblems") - @Override public int compareTo(SplitSortWrapper other) { - assert other != null; - - long res = len - other.len; - - if (res > 0) - return -1; - else if (res < 0) - return 1; - else - return id - other.id; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java deleted file mode 100644 index a44e1ae..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.Nullable; - -/** - * IGFS endpoint abstraction. - */ -public class HadoopIgfsEndpoint { - /** Localhost. */ - public static final String LOCALHOST = "127.0.0.1"; - - /** IGFS name. */ - private final String igfsName; - - /** Grid name. */ - private final String gridName; - - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Normalize IGFS URI. - * - * @param uri URI. - * @return Normalized URI. - * @throws IOException If failed. - */ - public static URI normalize(URI uri) throws IOException { - try { - if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme())) - throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); - - HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority()); - - StringBuilder sb = new StringBuilder(); - - if (endpoint.igfs() != null) - sb.append(endpoint.igfs()); - - if (endpoint.grid() != null) - sb.append(":").append(endpoint.grid()); - - return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), - uri.getPath(), uri.getQuery(), uri.getFragment()); - } - catch (URISyntaxException | IgniteCheckedException e) { - throw new IOException("Failed to normalize URI: " + uri, e); - } - } - - /** - * Constructor. - * - * @param connStr Connection string. - * @throws IgniteCheckedException If failed to parse connection string. - */ - public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException { - if (connStr == null) - connStr = ""; - - String[] tokens = connStr.split("@", -1); - - IgniteBiTuple<String, Integer> hostPort; - - if (tokens.length == 1) { - igfsName = null; - gridName = null; - - hostPort = hostPort(connStr, connStr); - } - else if (tokens.length == 2) { - String authStr = tokens[0]; - - if (authStr.isEmpty()) { - gridName = null; - igfsName = null; - } - else { - String[] authTokens = authStr.split(":", -1); - - igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; - - if (authTokens.length == 1) - gridName = null; - else if (authTokens.length == 2) - gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - } - - hostPort = hostPort(connStr, tokens[1]); - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - host = hostPort.get1(); - - assert hostPort.get2() != null; - - port = hostPort.get2(); - } - - /** - * Parse host and port. - * - * @param connStr Full connection string. - * @param hostPortStr Host/port connection string part. - * @return Tuple with host and port. - * @throws IgniteCheckedException If failed to parse connection string. - */ - private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { - String[] tokens = hostPortStr.split(":", -1); - - String host = tokens[0]; - - if (F.isEmpty(host)) - host = LOCALHOST; - - int port; - - if (tokens.length == 1) - port = IgfsIpcEndpointConfiguration.DFLT_PORT; - else if (tokens.length == 2) { - String portStr = tokens[1]; - - try { - port = Integer.valueOf(portStr); - - if (port < 0 || port > 65535) - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - catch (NumberFormatException e) { - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - return F.t(host, port); - } - - /** - * @return IGFS name. - */ - @Nullable public String igfs() { - return igfsName; - } - - /** - * @return Grid name. - */ - @Nullable public String grid() { - return gridName; - } - - /** - * @return Host. - */ - public String host() { - return host; - } - - /** - * @return Host. - */ - public boolean isLocal() { - return F.eq(LOCALHOST, host); - } - - /** - * @return Port. - */ - public int port() { - return port; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopIgfsEndpoint.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java deleted file mode 100644 index f01f72b..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; - -/** - * Base class for map-reduce planners. - */ -public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner { - /** Injected grid. */ - @IgniteInstanceResource - protected Ignite ignite; - - /** Logger. */ - @SuppressWarnings("UnusedDeclaration") - @LoggerResource - protected IgniteLogger log; - - /** - * Create plan topology. - * - * @param nodes Topology nodes. - * @return Plan topology. - */ - protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) { - Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size()); - - Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size()); - Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size()); - - for (ClusterNode node : nodes) { - String macs = node.attribute(ATTR_MACS); - - HadoopMapReducePlanGroup grp = macsMap.get(macs); - - if (grp == null) { - grp = new HadoopMapReducePlanGroup(node, macs); - - macsMap.put(macs, grp); - } - else - grp.add(node); - - idToGrp.put(node.id(), grp); - - for (String host : node.addresses()) { - HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host); - - if (hostGrp == null) - hostToGrp.put(host, grp); - else - assert hostGrp == grp; - } - } - - return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp); - } - - - /** - * Groups nodes by host names. - * - * @param top Topology to group. - * @return Map. - */ - protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) { - Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); - - for (ClusterNode node : top) { - for (String host : node.hostNames()) { - Collection<UUID> nodeIds = grouped.get(host); - - if (nodeIds == null) { - // Expecting 1-2 nodes per host. - nodeIds = new ArrayList<>(2); - - grouped.put(host, nodeIds); - } - - nodeIds.add(node.id()); - } - } - - return grouped; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java deleted file mode 100644 index 15c62c8..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.jetbrains.annotations.Nullable; - -/** - * Map-reduce plan. - */ -public class HadoopDefaultMapReducePlan implements HadoopMapReducePlan { - /** */ - private static final long serialVersionUID = 0L; - - /** Mappers map. */ - private Map<UUID, Collection<HadoopInputSplit>> mappers; - - /** Reducers map. */ - private Map<UUID, int[]> reducers; - - /** Mappers count. */ - private int mappersCnt; - - /** Reducers count. */ - private int reducersCnt; - - /** - * @param mappers Mappers map. - * @param reducers Reducers map. - */ - public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> mappers, - Map<UUID, int[]> reducers) { - this.mappers = mappers; - this.reducers = reducers; - - if (mappers != null) { - for (Collection<HadoopInputSplit> splits : mappers.values()) - mappersCnt += splits.size(); - } - - if (reducers != null) { - for (int[] rdcrs : reducers.values()) - reducersCnt += rdcrs.length; - } - } - - /** {@inheritDoc} */ - @Override public int mappers() { - return mappersCnt; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - return reducersCnt; - } - - /** {@inheritDoc} */ - @Override public UUID nodeForReducer(int reducer) { - assert reducer >= 0 && reducer < reducersCnt : reducer; - - for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) { - for (int r : entry.getValue()) { - if (r == reducer) - return entry.getKey(); - } - } - - throw new IllegalStateException("Not found reducer index: " + reducer); - } - - /** {@inheritDoc} */ - @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId) { - return mappers.get(nodeId); - } - - /** {@inheritDoc} */ - @Override @Nullable public int[] reducers(UUID nodeId) { - return reducers.get(nodeId); - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> mapperNodeIds() { - return mappers.keySet(); - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> reducerNodeIds() { - return reducers.keySet(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java deleted file mode 100644 index 2fe8682..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; - -import java.util.ArrayList; -import java.util.UUID; - -/** - * Map-reduce plan group of nodes on a single physical machine. - */ -public class HadoopMapReducePlanGroup { - /** Node. */ - private ClusterNode node; - - /** Nodes. */ - private ArrayList<ClusterNode> nodes; - - /** MAC addresses. */ - private final String macs; - - /** Weight. */ - private int weight; - - /** - * Constructor. - * - * @param node First node in the group. - * @param macs MAC addresses. - */ - public HadoopMapReducePlanGroup(ClusterNode node, String macs) { - assert node != null; - assert macs != null; - - this.node = node; - this.macs = macs; - } - - /** - * Add node to the group. - * - * @param newNode New node. - */ - public void add(ClusterNode newNode) { - if (node != null) { - nodes = new ArrayList<>(2); - - nodes.add(node); - - node = null; - } - - nodes.add(newNode); - } - - /** - * @return MAC addresses. - */ - public String macs() { - return macs; - } - - /** - * @return {@code True} if only sinle node present. - */ - public boolean single() { - return nodeCount() == 1; - } - - /** - * Get node ID by index. - * - * @param idx Index. - * @return Node. - */ - public UUID nodeId(int idx) { - ClusterNode res; - - if (node != null) { - assert idx == 0; - - res = node; - } - else { - assert nodes != null; - assert idx < nodes.size(); - - res = nodes.get(idx); - } - - assert res != null; - - return res.id(); - } - - /** - * @return Node count. - */ - public int nodeCount() { - return node != null ? 1 : nodes.size(); - } - - /** - * @return weight. - */ - public int weight() { - return weight; - } - - /** - * @param weight weight. - */ - public void weight(int weight) { - this.weight = weight; - } - - - /** {@inheritDoc} */ - @Override public int hashCode() { - return macs.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, ((HadoopMapReducePlanGroup)obj).macs); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopMapReducePlanGroup.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java deleted file mode 100644 index fa5c469..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -/** - * Map-reduce plan topology. - */ -public class HadoopMapReducePlanTopology { - /** All groups. */ - private final List<HadoopMapReducePlanGroup> grps; - - /** Node ID to group map. */ - private final Map<UUID, HadoopMapReducePlanGroup> idToGrp; - - /** Host to group map. */ - private final Map<String, HadoopMapReducePlanGroup> hostToGrp; - - /** - * Constructor. - * - * @param grps All groups. - * @param idToGrp ID to group map. - * @param hostToGrp Host to group map. - */ - public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps, - Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, HadoopMapReducePlanGroup> hostToGrp) { - assert grps != null; - assert idToGrp != null; - assert hostToGrp != null; - - this.grps = grps; - this.idToGrp = idToGrp; - this.hostToGrp = hostToGrp; - } - - /** - * @return All groups. - */ - public List<HadoopMapReducePlanGroup> groups() { - return grps; - } - - /** - * Get group for node ID. - * - * @param id Node ID. - * @return Group. - */ - public HadoopMapReducePlanGroup groupForId(UUID id) { - return idToGrp.get(id); - } - - /** - * Get group for host. - * - * @param host Host. - * @return Group. - */ - @Nullable public HadoopMapReducePlanGroup groupForHost(String host) { - return hostToGrp.get(host); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopMapReducePlanTopology.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java new file mode 100644 index 0000000..e1101c5 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.mapreduce; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.UUID; + +import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; + +/** + * Default map-reduce planner implementation. + */ +public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner { + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + // Convert collection of topology nodes to collection of topology node IDs. + Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); + + for (ClusterNode topNode : top) + topIds.add(topNode.id()); + + Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input()); + + int rdcCnt = job.info().reducers(); + + if (rdcCnt < 0) + throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); + + Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); + + return new HadoopDefaultMapReducePlan(mappers, reducers); + } + + /** + * Create plan for mappers. + * + * @param top Topology nodes. + * @param topIds Topology node IDs. + * @param splits Splits. + * @return Mappers map. + * @throws IgniteCheckedException If failed. + */ + private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, + Iterable<HadoopInputSplit> splits) throws IgniteCheckedException { + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); + + Map<String, Collection<UUID>> nodes = groupByHost(top); + + Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. + + for (UUID nodeId : topIds) + nodeLoads.put(nodeId, 0); + + for (HadoopInputSplit split : splits) { + UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); + + if (log.isDebugEnabled()) + log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); + + Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId); + + if (nodeSplits == null) { + nodeSplits = new ArrayList<>(); + + mappers.put(nodeId, nodeSplits); + } + + nodeSplits.add(split); + + // Updated node load. + nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); + } + + return mappers; + } + + /** + * Determine the best node for this split. + * + * @param split Split. + * @param topIds Topology node IDs. + * @param nodes Nodes. + * @param nodeLoads Node load tracker. + * @return Node ID. + */ + @SuppressWarnings("unchecked") + private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, + Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { + if (split instanceof HadoopFileBlock) { + HadoopFileBlock split0 = (HadoopFileBlock)split; + + if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); + + IgfsEx igfs = null; + + if (F.eq(ignite.name(), endpoint.grid())) + igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); + + if (igfs != null && !igfs.isProxy(split0.file())) { + IgfsPath path = new IgfsPath(split0.file()); + + if (igfs.exists(path)) { + Collection<IgfsBlockLocation> blocks; + + try { + blocks = igfs.affinity(path, split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + + assert blocks != null; + + if (blocks.size() == 1) + // Fast-path, split consists of one IGFS block (as in most cases). + return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); + else { + // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. + Map<UUID, Long> nodeMap = new HashMap<>(); + + List<UUID> bestNodeIds = null; + long bestLen = -1L; + + for (IgfsBlockLocation block : blocks) { + for (UUID blockNodeId : block.nodeIds()) { + if (topIds.contains(blockNodeId)) { + Long oldLen = nodeMap.get(blockNodeId); + long newLen = oldLen == null ? block.length() : oldLen + block.length(); + + nodeMap.put(blockNodeId, newLen); + + if (bestNodeIds == null || bestLen < newLen) { + bestNodeIds = new ArrayList<>(1); + + bestNodeIds.add(blockNodeId); + + bestLen = newLen; + } + else if (bestLen == newLen) { + assert !F.isEmpty(bestNodeIds); + + bestNodeIds.add(blockNodeId); + } + } + } + } + + if (bestNodeIds != null) { + return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : + bestNode(bestNodeIds, topIds, nodeLoads, true); + } + } + } + } + } + } + + // Cannot use local IGFS for some reason, try selecting the node by host. + Collection<UUID> blockNodes = null; + + for (String host : split.hosts()) { + Collection<UUID> hostNodes = nodes.get(host); + + if (!F.isEmpty(hostNodes)) { + if (blockNodes == null) + blockNodes = new ArrayList<>(hostNodes); + else + blockNodes.addAll(hostNodes); + } + } + + return bestNode(blockNodes, topIds, nodeLoads, false); + } + + /** + * Finds the best (the least loaded) node among the candidates. + * + * @param candidates Candidates. + * @param topIds Topology node IDs. + * @param nodeLoads Known node loads. + * @param skipTopCheck Whether to skip topology check. + * @return The best node. + */ + private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, + boolean skipTopCheck) { + UUID bestNode = null; + int bestLoad = Integer.MAX_VALUE; + + if (candidates != null) { + for (UUID candidate : candidates) { + if (skipTopCheck || topIds.contains(candidate)) { + int load = nodeLoads.get(candidate); + + if (bestNode == null || bestLoad > load) { + bestNode = candidate; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + } + + if (bestNode == null) { + // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. + bestLoad = Integer.MAX_VALUE; + + for (UUID nodeId : topIds) { + int load = nodeLoads.get(nodeId); + + if (bestNode == null || bestLoad > load) { + bestNode = nodeId; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + + assert bestNode != null; + + return bestNode; + } + + /** + * Create plan for reducers. + * + * @param top Topology. + * @param mappers Mappers map. + * @param reducerCnt Reducers count. + * @return Reducers map. + */ + private Map<UUID, int[]> reducers(Collection<ClusterNode> top, + Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) { + // Determine initial node weights. + int totalWeight = 0; + + List<WeightedNode> nodes = new ArrayList<>(top.size()); + + for (ClusterNode node : top) { + Collection<HadoopInputSplit> split = mappers.get(node.id()); + + int weight = reducerNodeWeight(node, split != null ? split.size() : 0); + + nodes.add(new WeightedNode(node.id(), weight, weight)); + + totalWeight += weight; + } + + // Adjust weights. + int totalAdjustedWeight = 0; + + for (WeightedNode node : nodes) { + node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; + + node.weight = Math.round(node.floatWeight); + + totalAdjustedWeight += node.weight; + } + + // Apply redundant/lost reducers. + Collections.sort(nodes); + + if (totalAdjustedWeight > reducerCnt) { + // Too much reducers set. + ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasPrevious()) + iter = nodes.listIterator(nodes.size() - 1); + + WeightedNode node = iter.previous(); + + if (node.weight > 0) { + node.weight -= 1; + + totalAdjustedWeight--; + } + } + } + else if (totalAdjustedWeight < reducerCnt) { + // Not enough reducers set. + ListIterator<WeightedNode> iter = nodes.listIterator(0); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasNext()) + iter = nodes.listIterator(0); + + WeightedNode node = iter.next(); + + if (node.floatWeight > 0.0f) { + node.weight += 1; + + totalAdjustedWeight++; + } + } + } + + int idx = 0; + + Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f); + + for (WeightedNode node : nodes) { + if (node.weight > 0) { + int[] arr = new int[node.weight]; + + for (int i = 0; i < arr.length; i++) + arr[i] = idx++; + + reducers.put(node.nodeId, arr); + } + } + + return reducers; + } + + /** + * Calculate node weight based on node metrics and data co-location. + * + * @param node Node. + * @param splitCnt Splits mapped to this node. + * @return Node weight. + */ + @SuppressWarnings("UnusedParameters") + protected int reducerNodeWeight(ClusterNode node, int splitCnt) { + return splitCnt; + } + + /** + * Weighted node. + */ + private static class WeightedNode implements Comparable<WeightedNode> { + /** Node ID. */ + private final UUID nodeId; + + /** Weight. */ + private int weight; + + /** Floating point weight. */ + private float floatWeight; + + /** + * Constructor. + * + * @param nodeId Node ID. + * @param weight Weight. + * @param floatWeight Floating point weight. + */ + private WeightedNode(UUID nodeId, int weight, float floatWeight) { + this.nodeId = nodeId; + this.weight = weight; + this.floatWeight = floatWeight; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return nodeId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull WeightedNode other) { + float res = other.floatWeight - floatWeight; + + return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); + } + } +} \ No newline at end of file