This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 98825b6 [FLINK-12178] Remove legacy Scheduler 98825b6 is described below commit 98825b69084ade03dcc302d9cdf6d9a74a27f9d1 Author: leesf <490081...@qq.com> AuthorDate: Mon Jun 10 19:13:37 2019 +0800 [FLINK-12178] Remove legacy Scheduler This closes #8676. --- .../jobmanager/scheduler/CoLocationConstraint.java | 2 +- .../runtime/jobmanager/scheduler/Scheduler.java | 887 --------------------- 2 files changed, 1 insertion(+), 888 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java index 23c9c21..8750749 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java @@ -99,7 +99,7 @@ public class CoLocationConstraint { * * @return True if the location has been assigned and the shared slot is alive, * false otherwise. - * @deprecated Should only be called by legacy code (if using {@link Scheduler}) + * @deprecated Should only be called by legacy code */ @Deprecated public boolean isAssignedAndAlive() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java deleted file mode 100644 index 1c41c00..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ /dev/null @@ -1,887 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceDiedException; -import org.apache.flink.runtime.instance.InstanceListener; -import org.apache.flink.runtime.instance.SharedSlot; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots. - * - * <p>The scheduler supports two scheduling modes:</p> - * <ul> - * <li>Immediate scheduling: A request for a task slot immediately returns a task slot, if one is - * available, or throws a {@link NoResourceAvailableException}.</li> - * <li>Queued Scheduling: A request for a task slot is queued and returns a future that will be - * fulfilled as soon as a slot becomes available.</li> - * </ul> - */ -public class Scheduler implements InstanceListener, SlotAvailabilityListener, SlotProvider { - - /** Scheduler-wide logger */ - private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); - - - /** All modifications to the scheduler structures are performed under a global scheduler lock */ - private final Object globalLock = new Object(); - - /** All instances that the scheduler can deploy to */ - private final Set<Instance> allInstances = new HashSet<Instance>(); - - /** All instances by hostname */ - private final HashMap<String, Set<Instance>> allInstancesByHost = new HashMap<String, Set<Instance>>(); - - /** All instances that still have available resources */ - private final Map<ResourceID, Instance> instancesWithAvailableResources = new LinkedHashMap<>(); - - /** All tasks pending to be scheduled */ - private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>(); - - - private final BlockingQueue<Instance> newlyAvailableInstances = new LinkedBlockingQueue<Instance>(); - - /** The number of slot allocations that had no location preference */ - private int unconstrainedAssignments; - - /** The number of slot allocations where locality could be respected */ - private int localizedAssignments; - - /** The number of slot allocations where locality could not be respected */ - private int nonLocalizedAssignments; - - /** The Executor which is used to execute newSlotAvailable futures. */ - private final Executor executor; - - // ------------------------------------------------------------------------ - - /** - * Creates a new scheduler. - */ - public Scheduler(Executor executor) { - this.executor = Preconditions.checkNotNull(executor); - } - - /** - * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler. - */ - public void shutdown() { - synchronized (globalLock) { - for (Instance i : allInstances) { - i.removeSlotListener(); - i.cancelAndReleaseAllSlots(); - } - allInstances.clear(); - allInstancesByHost.clear(); - instancesWithAvailableResources.clear(); - taskQueue.clear(); - } - } - - // ------------------------------------------------------------------------ - // Scheduling - // ------------------------------------------------------------------------ - - - @Override - public CompletableFuture<LogicalSlot> allocateSlot( - SlotRequestId slotRequestId, - ScheduledUnit task, - SlotProfile slotProfile, - boolean allowQueued, - Time allocationTimeout) { - - try { - final Object ret = scheduleTask(task, allowQueued, slotProfile.getPreferredLocations()); - - if (ret instanceof SimpleSlot) { - return CompletableFuture.completedFuture((SimpleSlot) ret); - } - else if (ret instanceof CompletableFuture) { - @SuppressWarnings("unchecked") - CompletableFuture<LogicalSlot> typed = (CompletableFuture<LogicalSlot>) ret; - return FutureUtils.orTimeout(typed, allocationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - else { - // this should never happen, simply guard this case with an exception - throw new RuntimeException(); - } - } catch (NoResourceAvailableException e) { - return FutureUtils.completedExceptionally(e); - } - } - - @Override - public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { - } - - /** - * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}. - */ - private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException { - if (task == null) { - throw new NullPointerException(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling task " + task); - } - - final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); - - final boolean forceExternalLocation = false && - preferredLocations != null && preferredLocations.iterator().hasNext(); - - synchronized (globalLock) { - - SlotSharingGroup sharingUnit = vertex.getJobVertex().getSlotSharingGroup(); - - if (sharingUnit != null) { - - // 1) === If the task has a slot sharing group, schedule with shared slots === - - if (queueIfNoResource) { - throw new IllegalArgumentException( - "A task with a vertex sharing group was scheduled in a queued fashion."); - } - - final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment(); - final CoLocationConstraint constraint = task.getCoLocationConstraint(); - - // sanity check that we do not use an externally forced location and a co-location constraint together - if (constraint != null && forceExternalLocation) { - throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a " - + "co-location constraint and an external location constraint."); - } - - // get a slot from the group, if the group has one for us (and can fulfill the constraint) - final SimpleSlot slotFromGroup; - if (constraint == null) { - slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations); - } - else { - slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations); - } - - SimpleSlot newSlot = null; - SimpleSlot toUse = null; - - // the following needs to make sure any allocated slot is released in case of an error - try { - - // check whether the slot from the group is already what we want. - // any slot that is local, or where the assignment was unconstrained is good! - if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { - - // if this is the first slot for the co-location constraint, we lock - // the location, because we are quite happy with the slot - if (constraint != null && !constraint.isAssigned()) { - constraint.lockLocation(); - } - - updateLocalityCounters(slotFromGroup, vertex); - return slotFromGroup; - } - - // the group did not have a local slot for us. see if we can one (or a better one) - - // our location preference is either determined by the location constraint, or by the - // vertex's preferred locations - final Iterable<TaskManagerLocation> locations; - final boolean localOnly; - if (constraint != null && constraint.isAssigned()) { - locations = Collections.singleton(constraint.getLocation()); - localOnly = true; - } - else { - locations = preferredLocations; - localOnly = forceExternalLocation; - } - - newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); - - if (newSlot == null) { - if (slotFromGroup == null) { - // both null, which means there is nothing available at all - - if (constraint != null && constraint.isAssigned()) { - // nothing is available on the node where the co-location constraint forces us to - throw new NoResourceAvailableException("Could not allocate a slot on instance " + - constraint.getLocation() + ", as required by the co-location constraint."); - } - else if (forceExternalLocation) { - // could not satisfy the external location constraint - String hosts = getHostnamesFromInstances(preferredLocations); - throw new NoResourceAvailableException("Could not schedule task " + vertex - + " to any of the required hosts: " + hosts); - } - else { - // simply nothing is available - throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), - getTotalNumberOfSlots(), getNumberOfAvailableSlots()); - } - } - else { - // got a non-local from the group, and no new one, so we use the non-local - // slot from the sharing group - toUse = slotFromGroup; - } - } - else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) { - // if there is no slot from the group, or the new slot is local, - // then we use the new slot - if (slotFromGroup != null) { - slotFromGroup.releaseSlot(null); - } - toUse = newSlot; - } - else { - // both are available and usable. neither is local. in that case, we may - // as well use the slot from the sharing group, to minimize the number of - // instances that the job occupies - newSlot.releaseSlot(null); - toUse = slotFromGroup; - } - - // if this is the first slot for the co-location constraint, we lock - // the location, because we are going to use that slot - if (constraint != null && !constraint.isAssigned()) { - constraint.lockLocation(); - } - - updateLocalityCounters(toUse, vertex); - } - catch (NoResourceAvailableException e) { - throw e; - } - catch (Throwable t) { - if (slotFromGroup != null) { - slotFromGroup.releaseSlot(t); - } - if (newSlot != null) { - newSlot.releaseSlot(t); - } - - ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group"); - } - - return toUse; - } - else { - - // 2) === schedule without hints and sharing === - - SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); - if (slot != null) { - updateLocalityCounters(slot, vertex); - return slot; - } - else { - // no resource available now, so queue the request - if (queueIfNoResource) { - CompletableFuture<LogicalSlot> future = new CompletableFuture<>(); - this.taskQueue.add(new QueuedTask(task, future)); - return future; - } - else if (forceExternalLocation) { - String hosts = getHostnamesFromInstances(preferredLocations); - throw new NoResourceAvailableException("Could not schedule task " + vertex - + " to any of the required hosts: " + hosts); - } - else { - throw new NoResourceAvailableException(getNumberOfAvailableInstances(), - getTotalNumberOfSlots(), getNumberOfAvailableSlots()); - } - } - } - } - } - - /** - * Gets a suitable instance to schedule the vertex execution to. - * <p> - * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller. - * - * @param vertex The task to run. - * @return The instance to run the vertex on, it {@code null}, if no instance is available. - */ - protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, - Iterable<TaskManagerLocation> requestedLocations, - boolean localOnly) { - // we need potentially to loop multiple times, because there may be false positives - // in the set-with-available-instances - while (true) { - Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); - - if (instanceLocalityPair == null){ - return null; - } - - Instance instanceToUse = instanceLocalityPair.getLeft(); - Locality locality = instanceLocalityPair.getRight(); - - try { - SimpleSlot slot = instanceToUse.allocateSimpleSlot(); - - // if the instance has further available slots, re-add it to the set of available resources. - if (instanceToUse.hasResourcesAvailable()) { - this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse); - } - - if (slot != null) { - slot.setLocality(locality); - return slot; - } - } - catch (InstanceDiedException e) { - // the instance died it has not yet been propagated to this scheduler - // remove the instance from the set of available instances - removeInstance(instanceToUse); - } - - // if we failed to get a slot, fall through the loop - } - } - - /** - * Tries to allocate a new slot for a vertex that is part of a slot sharing group. If one - * of the instances has a slot available, the method will allocate it as a shared slot, add that - * shared slot to the sharing group, and allocate a simple slot from that shared slot. - * - * <p>This method will try to allocate a slot from one of the local instances, and fall back to - * non-local instances, if permitted.</p> - * - * @param vertex The vertex to allocate the slot for. - * @param requestedLocations The locations that are considered local. May be null or empty, if the - * vertex has no location preferences. - * @param groupAssignment The slot sharing group of the vertex. Mandatory parameter. - * @param constraint The co-location constraint of the vertex. May be null. - * @param localOnly Flag to indicate if non-local choices are acceptable. - * - * @return A sub-slot for the given vertex, or {@code null}, if no slot is available. - */ - protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex, - Iterable<TaskManagerLocation> requestedLocations, - SlotSharingGroupAssignment groupAssignment, - CoLocationConstraint constraint, - boolean localOnly) - { - // we need potentially to loop multiple times, because there may be false positives - // in the set-with-available-instances - while (true) { - Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); - - if (instanceLocalityPair == null) { - // nothing is available - return null; - } - - final Instance instanceToUse = instanceLocalityPair.getLeft(); - final Locality locality = instanceLocalityPair.getRight(); - - try { - JobVertexID groupID = vertex.getJobvertexId(); - - // allocate a shared slot from the instance - SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(groupAssignment); - - // if the instance has further available slots, re-add it to the set of available resources. - if (instanceToUse.hasResourcesAvailable()) { - this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse); - } - - if (sharedSlot != null) { - // add the shared slot to the assignment group and allocate a sub-slot - SimpleSlot slot = constraint == null ? - groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) : - groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint); - - if (slot != null) { - return slot; - } - else { - // could not add and allocate the sub-slot, so release shared slot - sharedSlot.releaseSlot(new FlinkException("Could not allocate sub-slot.")); - } - } - } - catch (InstanceDiedException e) { - // the instance died it has not yet been propagated to this scheduler - // remove the instance from the set of available instances - removeInstance(instanceToUse); - } - - // if we failed to get a slot, fall through the loop - } - } - - /** - * Tries to find a requested instance. If no such instance is available it will return a non- - * local instance. If no such instance exists (all slots occupied), then return null. - * - * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p> - * - * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that - * no locality preference exists. - * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen. - */ - private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) { - - // drain the queue of newly available instances - while (this.newlyAvailableInstances.size() > 0) { - Instance queuedInstance = this.newlyAvailableInstances.poll(); - if (queuedInstance != null) { - this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance); - } - } - - // if nothing is available at all, return null - if (this.instancesWithAvailableResources.isEmpty()) { - return null; - } - - Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator(); - - if (locations != null && locations.hasNext()) { - // we have a locality preference - - while (locations.hasNext()) { - TaskManagerLocation location = locations.next(); - if (location != null) { - Instance instance = instancesWithAvailableResources.remove(location.getResourceID()); - if (instance != null) { - return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL); - } - } - } - - // no local instance available - if (localOnly) { - return null; - } - else { - // take the first instance from the instances with resources - Iterator<Instance> instances = instancesWithAvailableResources.values().iterator(); - Instance instanceToUse = instances.next(); - instances.remove(); - - return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL); - } - } - else { - // no location preference, so use some instance - Iterator<Instance> instances = instancesWithAvailableResources.values().iterator(); - Instance instanceToUse = instances.next(); - instances.remove(); - - return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED); - } - } - - @Override - public void newSlotAvailable(final Instance instance) { - - // WARNING: The asynchrony here is necessary, because we cannot guarantee the order - // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks: - // - // -> The scheduler needs to grab them (1) global scheduler lock - // (2) slot/instance lock - // -> The slot releasing grabs (1) slot/instance (for releasing) and - // (2) scheduler (to check whether to take a new task item - // - // that leads with a high probability to deadlocks, when scheduling fast - - newlyAvailableInstances.add(instance); - - executor.execute(new Runnable() { - @Override - public void run() { - handleNewSlot(); - } - }); - } - - private void handleNewSlot() { - - synchronized (globalLock) { - Instance instance = this.newlyAvailableInstances.poll(); - if (instance == null || !instance.hasResourcesAvailable()) { - // someone else took it - return; - } - - QueuedTask queued = taskQueue.peek(); - - // the slot was properly released, we can allocate a new one from that instance - - if (queued != null) { - ScheduledUnit task = queued.getTask(); - ExecutionVertex vertex = task.getTaskToExecute().getVertex(); - - try { - SimpleSlot newSlot = instance.allocateSimpleSlot(); - if (newSlot != null) { - - // success, remove from the task queue and notify the future - taskQueue.poll(); - if (queued.getFuture() != null) { - try { - queued.getFuture().complete(newSlot); - } - catch (Throwable t) { - LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t); - task.getTaskToExecute().fail(t); - } - } - } - } - catch (InstanceDiedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Instance " + instance + " was marked dead asynchronously."); - } - - removeInstance(instance); - } - } - else { - this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); - } - } - } - - private void updateLocalityCounters(SimpleSlot slot, ExecutionVertex vertex) { - Locality locality = slot.getLocality(); - - switch (locality) { - case UNCONSTRAINED: - this.unconstrainedAssignments++; - break; - case LOCAL: - this.localizedAssignments++; - break; - case NON_LOCAL: - this.nonLocalizedAssignments++; - break; - default: - throw new RuntimeException(locality.name()); - } - - if (LOG.isDebugEnabled()) { - switch (locality) { - case UNCONSTRAINED: - LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot); - break; - case LOCAL: - LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot); - break; - case NON_LOCAL: - LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot); - break; - } - } - } - - // -------------------------------------------------------------------------------------------- - // Instance Availability - // -------------------------------------------------------------------------------------------- - - @Override - public void newInstanceAvailable(Instance instance) { - if (instance == null) { - throw new IllegalArgumentException(); - } - if (instance.getNumberOfAvailableSlots() <= 0) { - throw new IllegalArgumentException("The given instance has no resources."); - } - if (!instance.isAlive()) { - throw new IllegalArgumentException("The instance is not alive."); - } - - // synchronize globally for instance changes - synchronized (this.globalLock) { - - // check we do not already use this instance - if (!this.allInstances.add(instance)) { - throw new IllegalArgumentException("The instance is already contained."); - } - - try { - // make sure we get notifications about slots becoming available - instance.setSlotAvailabilityListener(this); - - // store the instance in the by-host-lookup - String instanceHostName = instance.getTaskManagerLocation().getHostname(); - Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName); - if (instanceSet == null) { - instanceSet = new HashSet<Instance>(); - allInstancesByHost.put(instanceHostName, instanceSet); - } - instanceSet.add(instance); - - // add it to the available resources and let potential waiters know - this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); - - // add all slots as available - for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) { - newSlotAvailable(instance); - } - } - catch (Throwable t) { - LOG.error("Scheduler could not add new instance " + instance, t); - removeInstance(instance); - } - } - } - - @Override - public void instanceDied(Instance instance) { - if (instance == null) { - throw new IllegalArgumentException(); - } - - instance.markDead(); - - // we only remove the instance from the pools, we do not care about the - synchronized (this.globalLock) { - // the instance must not be available anywhere any more - removeInstance(instance); - } - } - - private void removeInstance(Instance instance) { - if (instance == null) { - throw new NullPointerException(); - } - - allInstances.remove(instance); - instancesWithAvailableResources.remove(instance.getTaskManagerID()); - - String instanceHostName = instance.getTaskManagerLocation().getHostname(); - Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName); - if (instanceSet != null) { - instanceSet.remove(instance); - if (instanceSet.isEmpty()) { - allInstancesByHost.remove(instanceHostName); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Status reporting - // -------------------------------------------------------------------------------------------- - - /** - * - * NOTE: In the presence of multi-threaded operations, this number may be inexact. - * - * @return The number of empty slots, for tasks. - */ - public int getNumberOfAvailableSlots() { - int count = 0; - - synchronized (globalLock) { - processNewlyAvailableInstances(); - - for (Instance instance : instancesWithAvailableResources.values()) { - count += instance.getNumberOfAvailableSlots(); - } - } - - return count; - } - - public int getTotalNumberOfSlots() { - int count = 0; - - synchronized (globalLock) { - for (Instance instance : allInstances) { - if (instance.isAlive()) { - count += instance.getTotalNumberOfSlots(); - } - } - } - - return count; - } - - public int getNumberOfAvailableInstances() { - int numberAvailableInstances = 0; - synchronized (this.globalLock) { - for (Instance instance: allInstances ){ - if (instance.isAlive()){ - numberAvailableInstances++; - } - } - } - - return numberAvailableInstances; - } - - public int getNumberOfInstancesWithAvailableSlots() { - synchronized (globalLock) { - processNewlyAvailableInstances(); - - return instancesWithAvailableResources.size(); - } - } - - public Map<String, List<Instance>> getInstancesByHost() { - synchronized (globalLock) { - HashMap<String, List<Instance>> copy = new HashMap<String, List<Instance>>(); - - for (Map.Entry<String, Set<Instance>> entry : allInstancesByHost.entrySet()) { - copy.put(entry.getKey(), new ArrayList<Instance>(entry.getValue())); - } - return copy; - } - } - - public int getNumberOfUnconstrainedAssignments() { - return unconstrainedAssignments; - } - - public int getNumberOfLocalizedAssignments() { - return localizedAssignments; - } - - public int getNumberOfNonLocalizedAssignments() { - return nonLocalizedAssignments; - } - - // -------------------------------------------------------------------------------------------- - - private void processNewlyAvailableInstances() { - synchronized (globalLock) { - Instance instance; - - while ((instance = newlyAvailableInstances.poll()) != null) { - if (instance.hasResourcesAvailable()) { - instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); - } - } - } - } - - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static String getHostnamesFromInstances(Iterable<TaskManagerLocation> locations) { - StringBuilder bld = new StringBuilder(); - - boolean successive = false; - for (TaskManagerLocation loc : locations) { - if (successive) { - bld.append(", "); - } else { - successive = true; - } - bld.append(loc.getHostname()); - } - - return bld.toString(); - } - - // ------------------------------------------------------------------------ - // Nested members - // ------------------------------------------------------------------------ - - /** - * An entry in the queue of schedule requests. Contains the task to be scheduled and - * the future that tracks the completion. - */ - private static final class QueuedTask { - - private final ScheduledUnit task; - - private final CompletableFuture<LogicalSlot> future; - - - public QueuedTask(ScheduledUnit task, CompletableFuture<LogicalSlot> future) { - this.task = task; - this.future = future; - } - - public ScheduledUnit getTask() { - return task; - } - - public CompletableFuture<LogicalSlot> getFuture() { - return future; - } - } - - // ------------------------------------------------------------------------ - // Testing methods - // ------------------------------------------------------------------------ - - @VisibleForTesting - @Nullable - public Instance getInstance(ResourceID resourceId) { - for (Instance instance : allInstances) { - if (Objects.equals(resourceId, instance.getTaskManagerID())) { - return instance; - } - } - return null; - } -}