Repository: storm Updated Branches: refs/heads/1.x-branch e7fab41a1 -> 20b612c25
[STORM-2134] - improving the current scheduling strategy for RAS Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d755eeb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d755eeb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d755eeb Branch: refs/heads/1.x-branch Commit: 0d755eeb71c33a5ced9d603f11d74f1546e5a544 Parents: e7fab41 Author: Boyang Jerry Peng <jerryp...@yahoo-inc.com> Authored: Thu Oct 6 13:45:55 2016 -0500 Committer: Boyang Jerry Peng <jerryp...@yahoo-inc.com> Committed: Thu Oct 13 15:25:39 2016 -0500 ---------------------------------------------------------------------- pom.xml | 6 + storm-core/pom.xml | 16 + .../storm/scheduler/resource/ResourceUtils.java | 22 + .../DefaultResourceAwareStrategy.java | 673 ++++++++++++------- .../TestDefaultResourceAwareStrategy.java | 8 +- 5 files changed, 472 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0d755eeb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b60a1fa..562b8b5 100644 --- a/pom.xml +++ b/pom.xml @@ -206,6 +206,7 @@ <commons-io.version>2.5</commons-io.version> <commons-lang.version>2.5</commons-lang.version> <commons-exec.version>1.1</commons-exec.version> + <commons-collections.version>3.2.2</commons-collections.version> <commons-fileupload.version>1.3.2</commons-fileupload.version> <commons-codec.version>1.6</commons-codec.version> <commons-cli.version>1.3.1</commons-cli.version> @@ -568,6 +569,11 @@ <version>${commons-exec.version}</version> </dependency> <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>${commons-collections.version}</version> + </dependency> + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons-lang.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/0d755eeb/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 0b5ea02..b05fd35 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -161,6 +161,10 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <scope>compile</scope> @@ -502,6 +506,7 @@ <include>org.jgrapht:jgrapht-core</include> <include>org.apache.commons:commons-exec</include> <include>org.apache.commons:commons-compress</include> + <include>commons-collections:commons-collections</include> <include>org.apache.hadoop:hadoop-auth</include> <include>commons-io:commons-io</include> <include>commons-codec:commons-codec</include> @@ -664,6 +669,10 @@ <shadedPattern>org.apache.storm.shade.org.apache.commons.lang</shadedPattern> </relocation> <relocation> + <pattern>org.apache.commons.collections</pattern> + <shadedPattern>org.apache.storm.shade.org.apache.commons.collections</shadedPattern> + </relocation> + <relocation> <pattern>org.json.simple</pattern> <shadedPattern>org.apache.storm.shade.org.json.simple</shadedPattern> </relocation> @@ -780,6 +789,13 @@ </excludes> </filter> <filter> + <artifact>commons-collections:commons-collections</artifact> + <excludes> + <exclude>META-INF/LICENSE.txt</exclude> + <exclude>META-INF/NOTICE.txt</exclude> + </excludes> + </filter> + <filter> <artifact>commons-io:commons-io</artifact> <excludes> <exclude>META-INF/LICENSE.txt</exclude> http://git-wip-us.apache.org/repos/asf/storm/blob/0d755eeb/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java index 8700746..f3799af 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java @@ -181,4 +181,26 @@ public class ResourceUtils { } return str.toString(); } + + /** + * Calculate the sum of a collection of doubles + * @param list collection of doubles + * @return the sum of of collection of doubles + */ + public static double sum(Collection<Double> list) { + double sum = 0.0; + for (Double elem : list) { + sum += elem; + } + return sum; + } + + /** + * Caculate the average of a collection of doubles + * @param list a collection of doubles + * @return the average of collection of doubles + */ + public static double avg(Collection<Double> list) { + return sum(list) / list.size(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/0d755eeb/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java index 8957dc0..df3300c 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java @@ -20,25 +20,26 @@ package org.apache.storm.scheduler.resource.strategies.scheduling; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; -import java.util.TreeMap; import java.util.HashSet; -import java.util.Iterator; +import java.util.Set; import java.util.TreeSet; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.resource.RAS_Node; import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.storm.scheduler.resource.SchedulingResult; import org.apache.storm.scheduler.resource.SchedulingState; import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.commons.collections.ListUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,13 +52,11 @@ public class DefaultResourceAwareStrategy implements IStrategy { private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class); private Cluster _cluster; private Topologies _topologies; - private RAS_Node refNode = null; private Map<String, List<String>> _clusterInfo; private RAS_Nodes _nodes; - private final double CPU_WEIGHT = 1.0; - private final double MEM_WEIGHT = 1.0; - private final double NETWORK_WEIGHT = 1.0; + private TreeSet<ObjectResources> _sortedRacks = null; + private Map<String, TreeSet<ObjectResources>> _rackIdToSortedNodes = new HashMap<String, TreeSet<ObjectResources>>(); public void prepare (SchedulingState schedulingState) { _cluster = schedulingState.cluster; @@ -67,29 +66,12 @@ public class DefaultResourceAwareStrategy implements IStrategy { LOG.debug(this.getClusterInfo()); } - //the returned TreeMap keeps the Components sorted - private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap( - Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) { - TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>(); - Integer rank = 0; - for (Component ras_comp : ordered__Component_list) { - retMap.put(rank, new ArrayList<ExecutorDetails>()); - for(ExecutorDetails exec : ras_comp.execs) { - if(unassignedExecutors.contains(exec)) { - retMap.get(rank).add(exec); - } - } - rank++; - } - return retMap; - } - public SchedulingResult schedule(TopologyDetails td) { if (_nodes.getNodes().size() <= 0) { LOG.warn("No available nodes to schedule tasks on!"); return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); } - Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td); + Collection<ExecutorDetails> unassignedExecutors = new HashSet<ExecutorDetails>(_cluster.getUnassignedExecutors(td)); Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>(); LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); Collection<ExecutorDetails> scheduledTasks = new ArrayList<>(); @@ -100,25 +82,16 @@ public class DefaultResourceAwareStrategy implements IStrategy { return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); } - Queue<Component> ordered__Component_list = bfs(td, spouts); + //order executors to be scheduled + List<ExecutorDetails> orderedExecutors = orderExecutors(td, unassignedExecutors); - Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors); - Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); - //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. - //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1. - for (int i = 0; i < longestPriorityListSize; i++) { - for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) { - Iterator<ExecutorDetails> it = entry.getValue().iterator(); - if (it.hasNext()) { - ExecutorDetails exec = it.next(); - LOG.debug("\n\nAttempting to schedule: {} of component {}[ REQ {} ] with rank {}", - new Object[] { exec, td.getExecutorToComponent().get(exec), - td.getTaskResourceReqList(exec), entry.getKey() }); - scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks); - it.remove(); - } - } + + for (ExecutorDetails exec : orderedExecutors) { + LOG.debug("\n\nAttempting to schedule: {} of component {}[ REQ {} ]", + exec, td.getExecutorToComponent().get(exec), + td.getTaskResourceReqList(exec)); + scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks); } executorsNotScheduled.removeAll(scheduledTasks); @@ -146,6 +119,14 @@ public class DefaultResourceAwareStrategy implements IStrategy { return result; } + /** + * Schedule executor exec from topology td + * + * @param exec the executor to schedule + * @param td the topology executor exec is a part of + * @param schedulerAssignmentMap the assignments already calculated + * @param scheduledTasks executors that have been scheduled + */ private void scheduleExecutor(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) { WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap); @@ -167,69 +148,59 @@ public class DefaultResourceAwareStrategy implements IStrategy { } } + /** + * Find a worker to schedule executor exec on + * + * @param exec the executor to schedule + * @param td the topology that the executor is a part of + * @param scheduleAssignmentMap already calculated assignments + * @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster + */ private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { WorkerSlot ws = null; - // first scheduling - if (this.refNode == null) { - // iterate through an ordered list of all racks available to make sure we cannot schedule the first executor in any rack before we "give up" - // the list is ordered in decreasing order of effective resources. With the rack in the front of the list having the most effective resources. - for (RackResources rack : sortRacks(td.getId())) { - ws = this.getBestWorker(exec, td, rack.id, scheduleAssignmentMap); - if (ws != null) { - LOG.debug("best rack: {}", rack.id); - break; - } - } - } else { - ws = this.getBestWorker(exec, td, scheduleAssignmentMap); + + // iterate through an ordered list of all racks available to make sure we cannot schedule the first executor in any rack before we "give up" + // the list is ordered in decreasing order of effective resources. With the rack in the front of the list having the most effective resources. + if (_sortedRacks == null) { + _sortedRacks = sortRacks(td.getId(), scheduleAssignmentMap); } - if (ws != null) { - this.refNode = this.idToNode(ws.getNodeId()); + + for (ObjectResources rack : _sortedRacks) { + ws = this.getBestWorker(exec, td, rack.id, scheduleAssignmentMap); + if (ws != null) { + LOG.debug("best rack: {}", rack.id); + break; + } } - LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode); return ws; } - private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { - return this.getBestWorker(exec, td, null, scheduleAssignmentMap); - } - + /** + * Get the best worker to assign executor exec on a rack + * + * @param exec the executor to schedule + * @param td the topology that the executor is a part of + * @param rackId the rack id of the rack to find a worker on + * @param scheduleAssignmentMap already calculated assignments + * @return a worker to assign executor exec to. Returns null if a worker cannot be successfully found on rack with rackId + */ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String rackId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { - double taskMem = td.getTotalMemReqTask(exec); - double taskCPU = td.getTotalCpuReqTask(exec); - List<RAS_Node> nodes; - if(rackId != null) { - nodes = this.getAvailableNodesFromRack(rackId); - } else { - nodes = this.getAvailableNodes(); + if (!_rackIdToSortedNodes.containsKey(rackId)) { + _rackIdToSortedNodes.put(rackId, sortNodes(this.getAvailableNodesFromRack(rackId), rackId, td.getId(), scheduleAssignmentMap)); } - //First sort nodes by distance - TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>(); - for (RAS_Node n : nodes) { - if(n.getFreeSlots().size()>0) { - if (n.getAvailableMemoryResources() >= taskMem - && n.getAvailableCpuResources() >= taskCPU) { - double a = Math.pow(((taskCPU - n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1)) - * this.CPU_WEIGHT, 2); - double b = Math.pow(((taskMem - n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1)) - * this.MEM_WEIGHT, 2); - double c = 0.0; - if(this.refNode != null) { - c = Math.pow(this.distToNode(this.refNode, n) - * this.NETWORK_WEIGHT, 2); + + TreeSet<ObjectResources> sortedNodes = _rackIdToSortedNodes.get(rackId); + + double taskMem = td.getTotalMemReqTask(exec); + double taskCPU = td.getTotalCpuReqTask(exec); + for (ObjectResources nodeResources : sortedNodes) { + RAS_Node n = _nodes.getNodeById(nodeResources.id); + if (n.getAvailableCpuResources() >= taskCPU && n.getAvailableMemoryResources() >= taskMem && n.getFreeSlots().size() > 0) { + for (WorkerSlot ws : n.getFreeSlots()) { + if (checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) { + return ws; } - double distance = Math.sqrt(a + b + c); - nodeRankMap.put(distance, n); - } - } - } - //Then, pick worker from closest node that satisfy constraints - for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) { - RAS_Node n = entry.getValue(); - for(WorkerSlot ws : n.getFreeSlots()) { - if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) { - return ws; } } } @@ -237,18 +208,40 @@ public class DefaultResourceAwareStrategy implements IStrategy { } /** - * class to keep track of resources on a rack + * interface for calculating the number of existing executors scheduled on a object (rack or node) + */ + private interface ExistingScheduleFunc { + int getNumExistingSchedule(String objectId); + } + + /** + * a class to contain individual object resources as well as cumulative stats + */ + static class AllResources { + List<ObjectResources> objectResources = new LinkedList<ObjectResources>(); + double availMemResourcesOverall = 0.0; + double totalMemResourcesOverall = 0.0; + double availCpuResourcesOverall = 0.0; + double totalCpuResourcesOverall = 0.0; + String identifier; + + public AllResources(String identifier) { + this.identifier = identifier; + } + } + + /** + * class to keep track of resources on a rack or node */ - class RackResources { + static class ObjectResources { String id; double availMem = 0.0; double totalMem = 0.0; double availCpu = 0.0; double totalCpu = 0.0; - int freeSlots = 0; - int totalSlots = 0; double effectiveResources = 0.0; - public RackResources(String id) { + + public ObjectResources(String id) { this.id = id; } @@ -259,8 +252,87 @@ public class DefaultResourceAwareStrategy implements IStrategy { } /** + * Sorted Nodes * - * @param topoId + * @param availNodes a list of all the nodes we want to sort + * @param rackId the rack id availNodes are a part of + * @param topoId the topology that we are trying to schedule + * @param scheduleAssignmentMap calculated assignments so far + * @return a sorted list of nodes + * <p> + * Nodes are sorted by two criteria. 1) the number executors of the topology that needs to be scheduled is already on the node in descending order. + * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the existing executors of the topology. + * 2) the subordinate/subservient resource availability percentage of a node in descending order + * We calculate the resource availability percentage by dividing the resource availability on the node by the resource availability of the entire rack + * By doing this calculation, nodes that have exhausted or little of one of the resources mentioned above will be ranked after nodes that have more balanced resource availability. + * So we will be less likely to pick a node that have a lot of one resource but a low amount of another. + */ + private TreeSet<ObjectResources> sortNodes(List<RAS_Node> availNodes, String rackId, final String topoId, final Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + AllResources allResources = new AllResources("RACK"); + List<ObjectResources> nodes = allResources.objectResources; + final Map<String, String> nodeIdToRackId = new HashMap<String, String>(); + + for (RAS_Node ras_node : availNodes) { + String nodeId = ras_node.getId(); + ObjectResources node = new ObjectResources(nodeId); + + double availMem = ras_node.getAvailableMemoryResources(); + double availCpu = ras_node.getAvailableCpuResources(); + int freeSlots = ras_node.totalSlotsFree(); + double totalMem = ras_node.getTotalMemoryResources(); + double totalCpu = ras_node.getTotalCpuResources(); + int totalSlots = ras_node.totalSlots(); + + node.availMem = availMem; + node.totalMem = totalMem; + node.availCpu = availCpu; + node.totalCpu = totalCpu; + nodes.add(node); + + allResources.availMemResourcesOverall += availMem; + allResources.availCpuResourcesOverall += availCpu; + + allResources.totalMemResourcesOverall += totalMem; + allResources.totalCpuResourcesOverall += totalCpu; + } + + + LOG.debug("Rack {}: Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]", + rackId, allResources.availCpuResourcesOverall, allResources.availMemResourcesOverall, allResources.totalCpuResourcesOverall, allResources.totalMemResourcesOverall); + + return sortObjectResources(allResources, new ExistingScheduleFunc() { + @Override + public int getNumExistingSchedule(String objectId) { + + //Get execs already assigned in rack + Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>(); + if (_cluster.getAssignmentById(topoId) != null) { + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : _cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) { + WorkerSlot workerSlot = entry.getValue(); + ExecutorDetails exec = entry.getKey(); + if (workerSlot.getNodeId().equals(objectId)) { + execs.add(exec); + } + } + } + // get execs already scheduled in the current scheduling + for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : scheduleAssignmentMap.entrySet()) { + + WorkerSlot workerSlot = entry.getKey(); + if (workerSlot.getNodeId().equals(objectId)) { + execs.addAll(entry.getValue()); + } + } + return execs.size(); + } + }); + } + + /** + * Sort racks + * + * @param topoId topology id + * @param scheduleAssignmentMap calculated assignments so far * @return a sorted list of racks * Racks are sorted by two criteria. 1) the number executors of the topology that needs to be scheduled is already on the rack in descending order. * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the topology. @@ -269,117 +341,161 @@ public class DefaultResourceAwareStrategy implements IStrategy { * By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after racks that have more balanced resource availability. * So we will be less likely to pick a rack that have a lot of one resource but a low amount of another. */ - TreeSet<RackResources> sortRacks(final String topoId) { - List<RackResources> racks = new LinkedList<RackResources>(); - final Map<String, String> nodeIdToRackId = new HashMap<String, String>(); - double availMemResourcesOverall = 0.0; - double totalMemResourcesOverall = 0.0; - - double availCpuResourcesOverall = 0.0; - double totalCpuResourcesOverall = 0.0; + TreeSet<ObjectResources> sortRacks(final String topoId, final Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + AllResources allResources = new AllResources("Cluster"); + List<ObjectResources> racks = allResources.objectResources; - int freeSlotsOverall = 0; - int totalSlotsOverall = 0; + final Map<String, String> nodeIdToRackId = new HashMap<String, String>(); - for (Entry<String, List<String>> entry : _clusterInfo.entrySet()) { + for (Map.Entry<String, List<String>> entry : _clusterInfo.entrySet()) { String rackId = entry.getKey(); List<String> nodeIds = entry.getValue(); - RackResources rack = new RackResources(rackId); + ObjectResources rack = new ObjectResources(rackId); racks.add(rack); for (String nodeId : nodeIds) { RAS_Node node = _nodes.getNodeById(this.NodeHostnameToId(nodeId)); double availMem = node.getAvailableMemoryResources(); double availCpu = node.getAvailableCpuResources(); - double freeSlots = node.totalSlotsFree(); double totalMem = node.getTotalMemoryResources(); double totalCpu = node.getTotalCpuResources(); - double totalSlots = node.totalSlots(); rack.availMem += availMem; rack.totalMem += totalMem; rack.availCpu += availCpu; rack.totalCpu += totalCpu; - rack.freeSlots += freeSlots; - rack.totalSlots += totalSlots; nodeIdToRackId.put(nodeId, rack.id); - availMemResourcesOverall += availMem; - availCpuResourcesOverall += availCpu; - freeSlotsOverall += freeSlots; + allResources.availMemResourcesOverall += availMem; + allResources.availCpuResourcesOverall += availCpu; - totalMemResourcesOverall += totalMem; - totalCpuResourcesOverall += totalCpu; - totalSlotsOverall += totalSlots; + allResources.totalMemResourcesOverall += totalMem; + allResources.totalCpuResourcesOverall += totalCpu; } } + LOG.debug("Cluster Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]", + allResources.availCpuResourcesOverall, allResources.availMemResourcesOverall, allResources.totalCpuResourcesOverall, allResources.totalMemResourcesOverall); + + return sortObjectResources(allResources, new ExistingScheduleFunc() { + @Override + public int getNumExistingSchedule(String objectId) { + + String rackId = objectId; + //Get execs already assigned in rack + Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>(); + if (_cluster.getAssignmentById(topoId) != null) { + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : _cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) { + String nodeId = entry.getValue().getNodeId(); + String hostname = idToNode(nodeId).getHostname(); + ExecutorDetails exec = entry.getKey(); + if (nodeIdToRackId.get(hostname) != null && nodeIdToRackId.get(hostname).equals(rackId)) { + execs.add(exec); + } + } + } + // get execs already scheduled in the current scheduling + for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : scheduleAssignmentMap.entrySet()) { + WorkerSlot workerSlot = entry.getKey(); + String nodeId = workerSlot.getNodeId(); + String hostname = idToNode(nodeId).getHostname(); + if (nodeIdToRackId.get(hostname).equals(rackId)) { + execs.addAll(entry.getValue()); + } + } + return execs.size(); + } + }); + } + + /** + * Sort objects by the following two criteria. 1) the number executors of the topology that needs to be scheduled is already on the object (node or rack) in descending order. + * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same object (node or rack) as the existing executors of the topology. + * 2) the subordinate/subservient resource availability percentage of a rack in descending order + * We calculate the resource availability percentage by dividing the resource availability of the object (node or rack) by the resource availability of the entire rack or cluster depending on if object + * references a node or a rack. + * By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned above will be ranked after racks that have more balanced resource availability. + * So we will be less likely to pick a rack that have a lot of one resource but a low amount of another. + * + * @param allResources contains all individual ObjectResources as well as cumulative stats + * @param existingScheduleFunc a function to get existing executors already scheduled on this object + * @return a sorted list of ObjectResources + */ + private TreeSet<ObjectResources> sortObjectResources(final AllResources allResources, final ExistingScheduleFunc existingScheduleFunc) { - LOG.debug("Cluster Overall Avail [ CPU {} MEM {} Slots {} ] Total [ CPU {} MEM {} Slots {} ]", - availCpuResourcesOverall, availMemResourcesOverall, freeSlotsOverall, totalCpuResourcesOverall, totalMemResourcesOverall, totalSlotsOverall); - for (RackResources rack : racks) { - if (availCpuResourcesOverall <= 0.0 || availMemResourcesOverall <= 0.0 || freeSlotsOverall <= 0.0) { - rack.effectiveResources = 0.0; + for (ObjectResources objectResources : allResources.objectResources) { + StringBuilder sb = new StringBuilder(); + if (allResources.availCpuResourcesOverall <= 0.0 || allResources.availMemResourcesOverall <= 0.0) { + objectResources.effectiveResources = 0.0; } else { - rack.effectiveResources = Math.min(Math.min((rack.availCpu / availCpuResourcesOverall), (rack.availMem / availMemResourcesOverall)), ((double) rack.freeSlots / (double) freeSlotsOverall)); + List<Double> values = new LinkedList<Double>(); + + //add cpu + double cpuPercent = (objectResources.availCpu / allResources.availCpuResourcesOverall) * 100.0; + values.add(cpuPercent); + sb.append(String.format("CPU %f(%f%%) ", objectResources.availCpu, cpuPercent)); + + //add memory + double memoryPercent = (objectResources.availMem / allResources.availMemResourcesOverall) * 100.0; + values.add(memoryPercent); + sb.append(String.format("MEM %f(%f%%) ", objectResources.availMem, memoryPercent)); + + objectResources.effectiveResources = Collections.min(values); } - LOG.debug("rack: {} Avail [ CPU {}({}%) MEM {}({}%) Slots {}({}%) ] Total [ CPU {} MEM {} Slots {} ] effective resources: {}", - rack.id, rack.availCpu, rack.availCpu/availCpuResourcesOverall * 100.0, rack.availMem, rack.availMem/availMemResourcesOverall * 100.0, - rack.freeSlots, ((double) rack.freeSlots / (double) freeSlotsOverall) * 100.0, rack.totalCpu, rack.totalMem, rack.totalSlots, rack.effectiveResources); + LOG.debug("{}: Avail [ {} ] Total [ CPU {} MEM {}] effective resources: {}", + objectResources.id, sb.toString(), + objectResources.totalCpu, objectResources.totalMem, objectResources.effectiveResources); } - TreeSet<RackResources> sortedRacks = new TreeSet<RackResources>(new Comparator<RackResources>() { + TreeSet<ObjectResources> sortedObjectResources = new TreeSet<ObjectResources>(new Comparator<ObjectResources>() { @Override - public int compare(RackResources o1, RackResources o2) { + public int compare(ObjectResources o1, ObjectResources o2) { - int execsScheduledInRack1 = getExecutorsScheduledinRack(topoId, o1.id, nodeIdToRackId).size(); - int execsScheduledInRack2 = getExecutorsScheduledinRack(topoId, o2.id, nodeIdToRackId).size(); - if (execsScheduledInRack1 > execsScheduledInRack2) { + int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id); + int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id); + if (execsScheduled1 > execsScheduled2) { return -1; - } else if (execsScheduledInRack1 < execsScheduledInRack2) { + } else if (execsScheduled1 < execsScheduled2) { return 1; } else { if (o1.effectiveResources > o2.effectiveResources) { return -1; } else if (o1.effectiveResources < o2.effectiveResources) { return 1; - } - else { - return o1.id.compareTo(o2.id); + } else { + List<Double> o1_values = new LinkedList<Double>(); + List<Double> o2_values = new LinkedList<Double>(); + o1_values.add((o1.availCpu / allResources.availCpuResourcesOverall) * 100.0); + o2_values.add((o2.availCpu / allResources.availCpuResourcesOverall) * 100.0); + + o1_values.add((o1.availMem / allResources.availMemResourcesOverall) * 100.0); + o2_values.add((o2.availMem / allResources.availMemResourcesOverall) * 100.0); + + double o1_avg = ResourceUtils.avg(o1_values); + double o2_avg = ResourceUtils.avg(o2_values); + + if (o1_avg > o2_avg) { + return -1; + } else if (o1_avg < o2_avg) { + return 1; + } else { + return o1.id.compareTo(o2.id); + } } } } }); - sortedRacks.addAll(racks); - LOG.debug("Sorted rack: {}", sortedRacks); - return sortedRacks; - } - - private Collection<ExecutorDetails> getExecutorsScheduledinRack(String topoId, String rackId, Map<String, String> nodeToRack) { - Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>(); - if (_cluster.getAssignmentById(topoId) != null) { - for (Entry<ExecutorDetails, WorkerSlot> entry : _cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) { - String nodeId = entry.getValue().getNodeId(); - String hostname = idToNode(nodeId).getHostname(); - ExecutorDetails exec = entry.getKey(); - if (nodeToRack.get(hostname) != null && nodeToRack.get(hostname).equals(rackId)) { - execs.add(exec); - } - } - } - return execs; - } - - private Double distToNode(RAS_Node src, RAS_Node dest) { - if (src.getId().equals(dest.getId())) { - return 0.0; - } else if (this.nodeToRack(src).equals(this.nodeToRack(dest))) { - return 0.5; - } else { - return 1.0; - } + sortedObjectResources.addAll(allResources.objectResources); + LOG.debug("Sorted Object Resources: {}", sortedObjectResources); + return sortedObjectResources; } + /** + * Get the rack on which a node is a part of + * + * @param node the node to find out which rack its on + * @return the rack id + */ private String nodeToRack(RAS_Node node) { - for (Entry<String, List<String>> entry : _clusterInfo + for (Map.Entry<String, List<String>> entry : _clusterInfo .entrySet()) { if (entry.getValue().contains(node.getHostname())) { return entry.getKey(); @@ -389,14 +505,12 @@ public class DefaultResourceAwareStrategy implements IStrategy { return null; } - private List<RAS_Node> getAvailableNodes() { - LinkedList<RAS_Node> nodes = new LinkedList<>(); - for (String rackId : _clusterInfo.keySet()) { - nodes.addAll(this.getAvailableNodesFromRack(rackId)); - } - return nodes; - } - + /** + * get a list nodes from a rack + * + * @param rackId the rack id of the rack to get nodes from + * @return a list of nodes + */ private List<RAS_Node> getAvailableNodesFromRack(String rackId) { List<RAS_Node> retList = new ArrayList<>(); for (String node_id : _clusterInfo.get(rackId)) { @@ -406,59 +520,127 @@ public class DefaultResourceAwareStrategy implements IStrategy { return retList; } - private List<WorkerSlot> getAvailableWorkersFromRack(String rackId) { - List<RAS_Node> nodes = this.getAvailableNodesFromRack(rackId); - List<WorkerSlot> workers = new LinkedList<>(); - for(RAS_Node node : nodes) { - workers.addAll(node.getFreeSlots()); - } - return workers; + /** + * sort components by the number of in and out connections that need to be made + * + * @param componentMap The components that need to be sorted + * @return a sorted set of components + */ + private Set<Component> sortComponents(final Map<String, Component> componentMap) { + Set<Component> sortedComponents = new TreeSet<Component>(new Comparator<Component>() { + @Override + public int compare(Component o1, Component o2) { + int connections1 = 0; + int connections2 = 0; + + for (String childId : (List<String>) ListUtils.union(o1.children, o1.parents)) { + connections1 += (componentMap.get(childId).execs.size() * o1.execs.size()); + } + + for (String childId : (List<String>) ListUtils.union(o2.children, o2.parents)) { + connections2 += (componentMap.get(childId).execs.size() * o2.execs.size()); + } + + if (connections1 > connections1) { + return -1; + } else if (connections1 < connections2) { + return 1; + } else { + return o1.id.compareTo(o2.id); + } + } + }); + sortedComponents.addAll(componentMap.values()); + return sortedComponents; } - private List<WorkerSlot> getAvailableWorker() { - List<WorkerSlot> workers = new LinkedList<>(); - for (String rackId : _clusterInfo.keySet()) { - workers.addAll(this.getAvailableWorkersFromRack(rackId)); - } - return workers; + /** + * Sort a component's neighbors by the number of connections it needs to make with this component + * + * @param thisComp the component that we need to sort its neighbors + * @param componentMap all the components to sort + * @return a sorted set of components + */ + private Set<Component> sortNeighbors(final Component thisComp, final Map<String, Component> componentMap) { + Set<Component> sortedComponents = new TreeSet<Component>(new Comparator<Component>() { + @Override + public int compare(Component o1, Component o2) { + int connections1 = o1.execs.size() * thisComp.execs.size(); + int connections2 = o2.execs.size() * thisComp.execs.size(); + if (connections1 > connections2) { + return -1; + } else if (connections1 < connections2) { + return 1; + } else { + return o1.id.compareTo(o2.id); + } + } + }); + sortedComponents.addAll(componentMap.values()); + return sortedComponents; } /** - * Breadth first traversal of the topology DAG - * @param td - * @param spouts - * @return A partial ordering of components + * Order executors based on how many in and out connections it will potentially need to make. + * First order components by the number of in and out connections it will have. Then iterate through the sorted list of components. + * For each component sort the neighbors of that component by how many connections it will have to make with that component. + * Add an executor from this component and then from each neighboring component in sorted order. Do this until there is nothing left to schedule + * + * @param td The topology the executors belong to + * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to assign executors from this list + * @return a list of executors in sorted order */ - private Queue<Component> bfs(TopologyDetails td, List<Component> spouts) { - // Since queue is a interface - Queue<Component> ordered__Component_list = new LinkedList<Component>(); - HashSet<String> visited = new HashSet<>(); - - /* start from each spout that is not visited, each does a breadth-first traverse */ - for (Component spout : spouts) { - if (!visited.contains(spout.id)) { - Queue<Component> queue = new LinkedList<>(); - visited.add(spout.id); - queue.offer(spout); - while (!queue.isEmpty()) { - Component comp = queue.poll(); - ordered__Component_list.add(comp); - List<String> neighbors = new ArrayList<>(); - neighbors.addAll(comp.children); - neighbors.addAll(comp.parents); - for (String nbID : neighbors) { - if (!visited.contains(nbID)) { - Component child = td.getComponents().get(nbID); - visited.add(nbID); - queue.offer(child); - } - } + private List<ExecutorDetails> orderExecutors(TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) { + Map<String, Component> componentMap = td.getComponents(); + List<ExecutorDetails> execsScheduled = new LinkedList<>(); + + Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>(); + for (Component component : componentMap.values()) { + compToExecsToSchedule.put(component.id, new LinkedList<ExecutorDetails>()); + for (ExecutorDetails exec : component.execs) { + if (unassignedExecutors.contains(exec)) { + compToExecsToSchedule.get(component.id).add(exec); } } } - return ordered__Component_list; + + Set<Component> sortedComponents = sortComponents(componentMap); + sortedComponents.addAll(componentMap.values()); + + for (Component currComp : sortedComponents) { + Map<String, Component> neighbors = new HashMap<String, Component>(); + for (String compId : (List<String>) ListUtils.union(currComp.children, currComp.parents)) { + neighbors.put(compId, componentMap.get(compId)); + } + Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors); + Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.id); + + boolean flag = false; + do { + flag = false; + if (!currCompExesToSched.isEmpty()) { + execsScheduled.add(currCompExesToSched.poll()); + flag = true; + } + + for (Component neighborComp : sortedNeighbors) { + Queue<ExecutorDetails> neighborCompExesToSched = compToExecsToSchedule.get(neighborComp.id); + if (!neighborCompExesToSched.isEmpty()) { + execsScheduled.add(neighborCompExesToSched.poll()); + flag = true; + } + } + } while (flag); + } + return execsScheduled; } + /** + * Get a list of all the spouts in the topology + * + * @param td topology to get spouts from + * @return a list of spouts + */ private List<Component> getSpouts(TopologyDetails td) { List<Component> spouts = new ArrayList<>(); @@ -470,22 +652,12 @@ public class DefaultResourceAwareStrategy implements IStrategy { return spouts; } - private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) { - Integer mostNum = 0; - for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) { - Integer numExecs = execs.size(); - if (mostNum < numExecs) { - mostNum = numExecs; - } - } - return mostNum; - } - /** * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size - * @param ws - * @param td - * @param scheduleAssignmentMap + * + * @param ws the worker to get the remaining amount of memory that can be assigned to it + * @param td the topology that has executors running on the worker + * @param scheduleAssignmentMap the schedulings calculated so far * @return The remaining amount of memory */ private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { @@ -495,16 +667,17 @@ public class DefaultResourceAwareStrategy implements IStrategy { /** * Get the amount of memory already assigned to a worker - * @param ws - * @param td - * @param scheduleAssignmentMap + * + * @param ws the worker to get the amount of memory assigned to a worker + * @param td the topology that has executors running on the worker + * @param scheduleAssignmentMap the schedulings calculated so far * @return the amount of memory */ private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { Double totalMem = 0.0; Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws); - if(execs != null) { - for(ExecutorDetails exec : execs) { + if (execs != null) { + for (ExecutorDetails exec : execs) { totalMem += td.getTotalMemReqTask(exec); } } @@ -514,15 +687,16 @@ public class DefaultResourceAwareStrategy implements IStrategy { /** * Checks whether we can schedule an Executor exec on the worker slot ws * Only considers memory currently. May include CPU in the future - * @param exec - * @param ws - * @param td - * @param scheduleAssignmentMap + * + * @param exec the executor to check whether it can be asssigned to worker ws + * @param ws the worker to check whether executor exec can be assigned to it + * @param td the topology that the exec is from + * @param scheduleAssignmentMap the schedulings calculated so far * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot */ private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { boolean retVal = false; - if(this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) { + if (this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) { retVal = true; } return retVal; @@ -530,14 +704,15 @@ public class DefaultResourceAwareStrategy implements IStrategy { /** * Get the amount of resources available and total for each node + * * @return a String with cluster resource info for debug */ private String getClusterInfo() { String retVal = "Cluster info:\n"; - for(Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) { + for (Map.Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) { String clusterId = clusterEntry.getKey(); retVal += "Rack: " + clusterId + "\n"; - for(String nodeHostname : clusterEntry.getValue()) { + for (String nodeHostname : clusterEntry.getValue()) { RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname)); retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n"; retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + " Slots: " + node.totalSlotsFree() + "}\n"; @@ -549,7 +724,8 @@ public class DefaultResourceAwareStrategy implements IStrategy { /** * hostname to Id - * @param hostname + * + * @param hostname the hostname to convert to node id * @return the id of a node */ public String NodeHostnameToId(String hostname) { @@ -567,12 +743,13 @@ public class DefaultResourceAwareStrategy implements IStrategy { /** * Find RAS_Node for specified node id - * @param id + * + * @param id the node/supervisor id to lookup * @return a RAS_Node object */ public RAS_Node idToNode(String id) { RAS_Node ret = _nodes.getNodeById(id); - if(ret == null) { + if (ret == null) { LOG.error("Cannot find Node with Id: {}", id); } return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/0d755eeb/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java index 77f23aa..f533543 100644 --- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java +++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java @@ -33,7 +33,7 @@ import org.apache.storm.scheduler.resource.RAS_Node; import org.apache.storm.scheduler.resource.RAS_Nodes; import org.apache.storm.scheduler.resource.ResourceAwareScheduler; import org.apache.storm.scheduler.resource.SchedulingResult; -import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.RackResources; +import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.ObjectResources; import org.apache.storm.scheduler.resource.SchedulingState; import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler; import org.apache.storm.scheduler.resource.User; @@ -268,10 +268,10 @@ public class TestDefaultResourceAwareStrategy { DefaultResourceAwareStrategy rs = new DefaultResourceAwareStrategy(); rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, topologies, config)); - TreeSet<RackResources> sortedRacks= rs.sortRacks(topo1.getId()); + TreeSet<ObjectResources> sortedRacks= rs.sortRacks(topo1.getId(), new HashMap<WorkerSlot, Collection<ExecutorDetails>>()); Assert.assertEquals("# of racks sorted", 5, sortedRacks.size()); - Iterator<RackResources> it = sortedRacks.iterator(); + Iterator<ObjectResources> it = sortedRacks.iterator(); // Ranked first since rack-0 has the most balanced set of resources Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id); // Ranked second since rack-1 has a balanced set of resources but less than rack-0 @@ -312,8 +312,6 @@ public class TestDefaultResourceAwareStrategy { cluster.assign(targetSlot, topo2.getId(), Arrays.asList(targetExec)); } - topologies.getById(topo2.getId()).getTotalMemoryResourceList(); - rs = new DefaultResourceAwareStrategy(); rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, topologies, config)); // schedule topo2