http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java new file mode 100644 index 0000000..a8aa1a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.providers.ProviderRole; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Binding information for application states; designed to be extensible + * so that tests don't have to be massivley reworked when new arguments + * are added. + */ +public class AppStateBindingInfo { + public AggregateConf instanceDefinition; + public Configuration serviceConfig = new Configuration(); + public Configuration publishedProviderConf = new Configuration(false); + public List<ProviderRole> roles = new ArrayList<>(); + public FileSystem fs; + public Path historyPath; + public List<Container> liveContainers = new ArrayList<>(0); + public Map<String, String> applicationInfo = new HashMap<>(); + public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector(); + /** node reports off the RM. */ + public List<NodeReport> nodeReports = new ArrayList<>(0); + + public void validate() throws IllegalArgumentException { + Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition"); + Preconditions.checkArgument(serviceConfig != null, "null appmasterConfig"); + Preconditions.checkArgument(publishedProviderConf != null, "null publishedProviderConf"); + Preconditions.checkArgument(releaseSelector != null, "null releaseSelector"); + Preconditions.checkArgument(roles != null, "null providerRoles"); + Preconditions.checkArgument(fs != null, "null fs"); + Preconditions.checkArgument(historyPath != null, "null historyDir"); + Preconditions.checkArgument(nodeReports != null, "null nodeReports"); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java new file mode 100644 index 0000000..5b3a93c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +/** + * Outcome of the assignment + */ +public enum ContainerAllocationOutcome { + /** + * There wasn't a request for this + */ + Unallocated, + + /** + * Open placement + */ + Open, + + /** + * Allocated explicitly where requested + */ + Placed, + + /** + * This was an escalated placement + */ + Escalated +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java new file mode 100644 index 0000000..e80639e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is just a tuple of the outcome of a container allocation + */ +public class ContainerAllocationResults { + + /** + * What was the outcome of this allocation: placed, escalated, ... + */ + public ContainerAllocationOutcome outcome; + + /** + * The outstanding request which originated this. + * This will be null if the outcome is {@link ContainerAllocationOutcome#Unallocated} + * as it wasn't expected. + */ + public OutstandingRequest origin; + + /** + * A possibly empty list of requests to add to the follow-up actions + */ + public List<AbstractRMOperation> operations = new ArrayList<>(0); + + public ContainerAllocationResults() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java new file mode 100644 index 0000000..3e8a3c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.hadoop.yarn.api.records.Container; + +/** + * Static assignment structure + */ +public class ContainerAssignment { + + /** + * Container that has been allocated + */ + public final Container container; + + /** + * Role to assign to it + */ + public final RoleStatus role; + + /** + * Placement outcome: was this from history or not + */ + public final ContainerAllocationOutcome placement; + + public ContainerAssignment(Container container, + RoleStatus role, + ContainerAllocationOutcome placement) { + this.container = container; + this.role = role; + this.placement = placement; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ContainerAssignment{"); + sb.append("container=").append(container); + sb.append(", role=").append(role); + sb.append(", placement=").append(placement); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java new file mode 100644 index 0000000..59ab30b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; + +/** + * Container outcomes we care about; slightly simplified from + * {@link ContainerExitStatus} -and hopefully able to handle + * any new exit codes. + */ +public enum ContainerOutcome { + Completed, + Failed, + Failed_limits_exceeded, + Node_failure, + Preempted; + + /** + * Build a container outcome from an exit status. + * The values in {@link ContainerExitStatus} are used + * here. + * @param exitStatus exit status + * @return an enumeration of the outcome. + */ + public static ContainerOutcome fromExitStatus(int exitStatus) { + switch (exitStatus) { + case ContainerExitStatus.ABORTED: + case ContainerExitStatus.KILLED_BY_APPMASTER: + case ContainerExitStatus.KILLED_BY_RESOURCEMANAGER: + case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION: + // could either be a release or node failure. Treat as completion + return Completed; + case ContainerExitStatus.DISKS_FAILED: + return Node_failure; + case ContainerExitStatus.PREEMPTED: + return Preempted; + case ContainerExitStatus.KILLED_EXCEEDED_PMEM: + case ContainerExitStatus.KILLED_EXCEEDED_VMEM: + return Failed_limits_exceeded; + default: + return exitStatus == 0 ? Completed : Failed; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java new file mode 100644 index 0000000..df222fa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Locale; + +/** + * Class containing the logic to build/split container priorities into the + * different fields used by Slider + * + * The original design here had a requestID merged with the role, to + * track outstanding requests. However, this isn't possible, so + * the request ID has been dropped. A "location specified" flag was + * added to indicate whether or not the request was for a specific location + * -though this is currently unused. + * + * The methods are effectively surplus -but retained to preserve the + * option of changing behavior in future + */ +public final class ContainerPriority { + + // bit that represents whether location is specified + static final int NOLOCATION = 1 << 30; + + public static int buildPriority(int role, + boolean locationSpecified) { + int location = locationSpecified ? 0 : NOLOCATION; + return role | location; + } + + + public static Priority createPriority(int role, + boolean locationSpecified) { + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(ContainerPriority.buildPriority(role, + locationSpecified)); + return pri; + } + + public static int extractRole(int priority) { + return priority >= NOLOCATION ? priority ^ NOLOCATION : priority; + } + + /** + * Does the priority have location + * @param priority priority index + * @return true if the priority has the location marker + */ + public static boolean hasLocation(int priority) { + return (priority ^ NOLOCATION ) == 0; + } + + /** + * Map from a container to a role key by way of its priority + * @param container container + * @return role key + */ + public static int extractRole(Container container) { + Priority priority = container.getPriority(); + return extractRole(priority); + } + + /** + * Priority record to role mapper + * @param priorityRecord priority record + * @return the role # + */ + public static int extractRole(Priority priorityRecord) { + Preconditions.checkNotNull(priorityRecord); + return extractRole(priorityRecord.getPriority()); + } + + /** + * Convert a priority record to a string, extracting role and locality + * @param priorityRecord priority record. May be null + * @return a string value + */ + public static String toString(Priority priorityRecord) { + if (priorityRecord==null) { + return "(null)"; + } else { + return String.format(Locale.ENGLISH, + "role %d (locality=%b)", + extractRole(priorityRecord), + hasLocation(priorityRecord.getPriority())); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java new file mode 100644 index 0000000..fafbada --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import java.util.List; + +/** + * Interface implemented by anything that must choose containers to release + * + */ +public interface ContainerReleaseSelector { + + /** + * Given a list of candidate containers, return a sorted version of the priority + * in which they should be released. + * @param candidates candidate list ... everything considered suitable + * @return the list of candidates + */ + List<RoleInstance> sortCandidates(int roleId, + List<RoleInstance> candidates); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java new file mode 100644 index 0000000..38c5b8e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.slider.common.tools.Comparators; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * Sort the candidate list by the most recent container first. + */ +public class MostRecentContainerReleaseSelector implements ContainerReleaseSelector { + + @Override + public List<RoleInstance> sortCandidates(int roleId, + List<RoleInstance> candidates) { + Collections.sort(candidates, new newerThan()); + return candidates; + } + + private static class newerThan implements Comparator<RoleInstance>, Serializable { + private final Comparator<Long> innerComparator = + new Comparators.ComparatorReverser<>(new Comparators.LongComparator()); + public int compare(RoleInstance o1, RoleInstance o2) { + return innerComparator.compare(o1.createTime, o2.createTime); + + } + + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java new file mode 100644 index 0000000..eb8ff03 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.slider.api.types.NodeEntryInformation; + +/** + * Information about the state of a role on a specific node instance. + * No fields are synchronized; sync on the instance to work with it + * <p> + * The two fields `releasing` and `requested` are used to track the ongoing + * state of YARN requests; they do not need to be persisted across stop/start + * cycles. They may be relevant across AM restart, but without other data + * structures in the AM, not enough to track what the AM was up to before + * it was restarted. The strategy will be to ignore unexpected allocation + * responses (which may come from pre-restart) requests, while treating + * unexpected container release responses as failures. + * <p> + * The `active` counter is only decremented after a container release response + * has been received. + * <p> + * + */ +public class NodeEntry implements Cloneable { + + public final int rolePriority; + + public NodeEntry(int rolePriority) { + this.rolePriority = rolePriority; + } + + /** + * instance explicitly requested on this node: it's OK if an allocation + * comes in that has not been (and when that happens, this count should + * not drop). + */ + private int requested; + + /** number of starting instances */ + private int starting; + + /** incrementing counter of instances that failed to start */ + private int startFailed; + + /** incrementing counter of instances that failed */ + private int failed; + + /** + * Counter of "failed recently" events. These are all failures + * which have happened since it was last reset. + */ + private int failedRecently; + + /** incrementing counter of instances that have been pre-empted. */ + private int preempted; + + /** + * Number of live nodes. + */ + private int live; + + /** number of containers being released off this node */ + private int releasing; + + /** timestamp of last use */ + private long lastUsed; + + /** + * Is the node available for assignments? That is, it is + * not running any instances of this type, nor are there + * any requests oustanding for it. + * @return true if a new request could be issued without taking + * the number of instances > 1. + */ + public synchronized boolean isAvailable() { + return live + requested + starting - releasing <= 0; + } + + /** + * Are the anti-affinity constraints held. That is, zero or one + * node running or starting + * @return true if the constraint holds. + */ + public synchronized boolean isAntiAffinityConstraintHeld() { + return (live - releasing + starting) <= 1; + } + + /** + * return no of active instances -those that could be released as they + * are live and not already being released + * @return a number, possibly 0 + */ + public synchronized int getActive() { + return (live - releasing); + } + + /** + * Return true if the node is not busy, and it + * has not been used since the absolute time + * @param absoluteTime time + * @return true if the node could be cleaned up + */ + public synchronized boolean notUsedSince(long absoluteTime) { + return isAvailable() && lastUsed < absoluteTime; + } + + public synchronized int getLive() { + return live; + } + + public int getStarting() { + return starting; + } + + /** + * Set the live value directly -used on AM restart + * @param v value + */ + public synchronized void setLive(int v) { + live = v; + } + + private synchronized void incLive() { + ++live; + } + + private synchronized void decLive() { + live = RoleHistoryUtils.decToFloor(live); + } + + public synchronized void onStarting() { + ++starting; + } + + private void decStarting() { + starting = RoleHistoryUtils.decToFloor(starting); + } + + public synchronized void onStartCompleted() { + decStarting(); + incLive(); + } + + /** + * start failed -decrement the starting flag. + * @return true if the node is now available + */ + public synchronized boolean onStartFailed() { + decStarting(); + ++startFailed; + return containerCompleted(false, ContainerOutcome.Failed); + } + + /** + * no of requests made of this role of this node. If it goes above + * 1 there's a problem + */ + public synchronized int getRequested() { + return requested; + } + + /** + * request a node: + */ + public synchronized void request() { + ++requested; + } + + /** + * A request made explicitly to this node has completed + */ + public synchronized void requestCompleted() { + requested = RoleHistoryUtils.decToFloor(requested); + } + + /** + * No of instances in release state + */ + public synchronized int getReleasing() { + return releasing; + } + + /** + * Release an instance -which is no longer marked as active + */ + public synchronized void release() { + releasing++; + } + + /** + * completion event, which can be a planned or unplanned + * planned: dec our release count + * unplanned: dec our live count + * @param wasReleased true if this was planned + * @param outcome + * @return true if this node is now available + */ + public synchronized boolean containerCompleted(boolean wasReleased, ContainerOutcome outcome) { + if (wasReleased) { + releasing = RoleHistoryUtils.decToFloor(releasing); + } else { + // for the node, we use the outcome of the faiure to decide + // whether this is potentially "node-related" + switch(outcome) { + // general "any reason" app failure + case Failed: + // specific node failure + case Node_failure: + + ++failed; + ++failedRecently; + break; + + case Preempted: + preempted++; + break; + + // failures which are node-independent + case Failed_limits_exceeded: + case Completed: + default: + break; + } + } + decLive(); + return isAvailable(); + } + + /** + * Time last used. + */ + public synchronized long getLastUsed() { + return lastUsed; + } + + public synchronized void setLastUsed(long lastUsed) { + this.lastUsed = lastUsed; + } + + public synchronized int getStartFailed() { + return startFailed; + } + + public synchronized int getFailed() { + return failed; + } + + public synchronized int getFailedRecently() { + return failedRecently; + } + + @VisibleForTesting + public synchronized void setFailedRecently(int failedRecently) { + this.failedRecently = failedRecently; + } + + public synchronized int getPreempted() { + return preempted; + } + + + /** + * Reset the failed recently count. + */ + public synchronized void resetFailedRecently() { + failedRecently = 0; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("NodeEntry{"); + sb.append("priority=").append(rolePriority); + sb.append(", requested=").append(requested); + sb.append(", starting=").append(starting); + sb.append(", live=").append(live); + sb.append(", releasing=").append(releasing); + sb.append(", lastUsed=").append(lastUsed); + sb.append(", failedRecently=").append(failedRecently); + sb.append(", preempted=").append(preempted); + sb.append(", startFailed=").append(startFailed); + sb.append('}'); + return sb.toString(); + } + + /** + * Produced a serialized form which can be served up as JSON + * @return a summary of the current role status. + */ + public synchronized NodeEntryInformation serialize() { + NodeEntryInformation info = new NodeEntryInformation(); + info.priority = rolePriority; + info.requested = requested; + info.releasing = releasing; + info.starting = starting; + info.startFailed = startFailed; + info.failed = failed; + info.failedRecently = failedRecently; + info.preempted = preempted; + info.live = live; + info.lastUsed = lastUsed; + return info; + } + + @Override + public Object clone() throws CloneNotSupportedException { + return super.clone(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java new file mode 100644 index 0000000..cc17cf0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.slider.api.types.NodeInformation; +import org.apache.slider.common.tools.Comparators; +import org.apache.slider.common.tools.SliderUtils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +/** + * A node instance -stores information about a node in the cluster. + * <p> + * Operations on the array/set of roles are synchronized. + */ +public class NodeInstance { + + public final String hostname; + + /** + * last state of node. Starts off as {@link NodeState#RUNNING}, + * on the assumption that it is live. + */ + private NodeState nodeState = NodeState.RUNNING; + + /** + * Last node report. If null: none + */ + private NodeReport nodeReport = null; + + /** + * time of state update + */ + private long nodeStateUpdateTime = 0; + + /** + * Node labels. + * + * IMPORTANT: we assume that there is one label/node, which is the policy + * for Hadoop as of November 2015 + */ + private String nodeLabels = ""; + + /** + * An unordered list of node entries of specific roles. There's nothing + * indexed so as to support sparser datastructures. + */ + private final List<NodeEntry> nodeEntries; + + /** + * Create an instance and the (empty) array of nodes + * @param roles role count -the no. of roles + */ + public NodeInstance(String hostname, int roles) { + this.hostname = hostname; + nodeEntries = new ArrayList<>(roles); + } + + /** + * Update the node status. + * The return code is true if the node state changed enough to + * trigger a re-evaluation of pending requests. That is, either a node + * became available when it was previously not, or the label changed + * on an available node. + * + * Transitions of a node from live to dead aren't treated as significant, + * nor label changes on a dead node. + * + * @param report latest node report + * @return true if the node state changed enough for a request evaluation. + */ + public synchronized boolean updateNode(NodeReport report) { + nodeStateUpdateTime = report.getLastHealthReportTime(); + nodeReport = report; + NodeState oldState = nodeState; + boolean oldStateUnusable = oldState.isUnusable(); + nodeState = report.getNodeState(); + boolean newUsable = !nodeState.isUnusable(); + boolean nodeNowAvailable = oldStateUnusable && newUsable; + String labels = this.nodeLabels; + nodeLabels = SliderUtils.extractNodeLabel(report); + return nodeNowAvailable + || newUsable && !this.nodeLabels.equals(labels); + } + + public String getNodeLabels() { + return nodeLabels; + } + + /** + * Get the entry for a role -if present + * @param role role index + * @return the entry + * null if the role is out of range + */ + public synchronized NodeEntry get(int role) { + for (NodeEntry nodeEntry : nodeEntries) { + if (nodeEntry.rolePriority == role) { + return nodeEntry; + } + } + return null; + } + + /** + * Get the entry for a role -if present + * @param role role index + * @return the entry + * @throws ArrayIndexOutOfBoundsException if the role is out of range + */ + public synchronized NodeEntry getOrCreate(int role) { + NodeEntry entry = get(role); + if (entry == null) { + entry = new NodeEntry(role); + nodeEntries.add(entry); + } + return entry; + } + + /** + * Get the node entry matching a container on this node + * @param container container + * @return matching node instance for the role + */ + public NodeEntry getOrCreate(Container container) { + return getOrCreate(ContainerPriority.extractRole(container)); + } + + /** + * Count the number of active role instances on this node + * @param role role index + * @return 0 if there are none, otherwise the #of nodes that are running and + * not being released already. + */ + public int getActiveRoleInstances(int role) { + NodeEntry nodeEntry = get(role); + return (nodeEntry != null ) ? nodeEntry.getActive() : 0; + } + + /** + * Count the number of live role instances on this node + * @param role role index + * @return 0 if there are none, otherwise the #of nodes that are running + */ + public int getLiveRoleInstances(int role) { + NodeEntry nodeEntry = get(role); + return (nodeEntry != null ) ? nodeEntry.getLive() : 0; + } + + /** + * Is the node considered online + * @return the node + */ + public boolean isOnline() { + return !nodeState.isUnusable(); + } + + /** + * Query for a node being considered unreliable + * @param role role key + * @param threshold threshold above which a node is considered unreliable + * @return true if the node is considered unreliable + */ + public boolean isConsideredUnreliable(int role, int threshold) { + NodeEntry entry = get(role); + return entry != null && entry.getFailedRecently() > threshold; + } + + /** + * Get the entry for a role -and remove it if present + * @param role the role index + * @return the entry that WAS there + */ + public synchronized NodeEntry remove(int role) { + NodeEntry nodeEntry = get(role); + if (nodeEntry != null) { + nodeEntries.remove(nodeEntry); + } + return nodeEntry; + } + + public synchronized void set(int role, NodeEntry nodeEntry) { + remove(role); + nodeEntries.add(nodeEntry); + } + + /** + * run through each entry; gc'ing & removing old ones that don't have + * a recent failure count (we care about those) + * @param absoluteTime age in millis + * @return true if there are still entries left + */ + public synchronized boolean purgeUnusedEntries(long absoluteTime) { + boolean active = false; + ListIterator<NodeEntry> entries = nodeEntries.listIterator(); + while (entries.hasNext()) { + NodeEntry entry = entries.next(); + if (entry.notUsedSince(absoluteTime) && entry.getFailedRecently() == 0) { + entries.remove(); + } else { + active = true; + } + } + return active; + } + + + /** + * run through each entry resetting the failure count + */ + public synchronized void resetFailedRecently() { + for (NodeEntry entry : nodeEntries) { + entry.resetFailedRecently(); + } + } + + @Override + public String toString() { + return hostname; + } + + /** + * Full dump of entry including children + * @return a multi-line description fo the node + */ + public String toFullString() { + final StringBuilder sb = + new StringBuilder(toString()); + sb.append("{ "); + for (NodeEntry entry : nodeEntries) { + sb.append(String.format("\n [%02d] ", entry.rolePriority)); + sb.append(entry.toString()); + } + sb.append("} "); + return sb.toString(); + } + + /** + * Equality test is purely on the hostname of the node address + * @param o other + * @return true if the hostnames are equal + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NodeInstance that = (NodeInstance) o; + return hostname.equals(that.hostname); + } + + @Override + public int hashCode() { + return hostname.hashCode(); + } + + + /** + * Predicate to query if the number of recent failures of a role + * on this node exceeds that role's failure threshold. + * If there is no record of a deployment of that role on this + * node, the failure count is taken as "0". + * @param role role to look up + * @return true if the failure rate is above the threshold. + */ + public boolean exceedsFailureThreshold(RoleStatus role) { + NodeEntry entry = get(role.getKey()); + int numFailuresOnLastHost = entry != null ? entry.getFailedRecently() : 0; + int failureThreshold = role.getNodeFailureThreshold(); + return failureThreshold < 0 || numFailuresOnLastHost > failureThreshold; + } + + /** + * Produced a serialized form which can be served up as JSON + * @param naming map of priority -> value for naming entries + * @return a summary of the current role status. + */ + public synchronized NodeInformation serialize(Map<Integer, String> naming) { + NodeInformation info = new NodeInformation(); + info.hostname = hostname; + // null-handling state constructor + info.state = "" + nodeState; + info.lastUpdated = nodeStateUpdateTime; + info.labels = nodeLabels; + if (nodeReport != null) { + info.httpAddress = nodeReport.getHttpAddress(); + info.rackName = nodeReport.getRackName(); + info.healthReport = nodeReport.getHealthReport(); + } + info.entries = new HashMap<>(nodeEntries.size()); + for (NodeEntry nodeEntry : nodeEntries) { + String name = naming.get(nodeEntry.rolePriority); + if (name == null) { + name = Integer.toString(nodeEntry.rolePriority); + } + info.entries.put(name, nodeEntry.serialize()); + } + return info; + } + + /** + * Is this node instance a suitable candidate for the specific role? + * @param role role ID + * @param label label which must match, or "" for no label checks + * @return true if the node has space for this role, is running and the labels + * match. + */ + public boolean canHost(int role, String label) { + return isOnline() + && (SliderUtils.isUnset(label) || label.equals(nodeLabels)) // label match + && getOrCreate(role).isAvailable(); // no live role + } + + /** + * A comparator for sorting entries where the node is preferred over another. + * + * The exact algorithm may change: current policy is "most recent first", so sorted + * on the lastUsed + * + * the comparision is a positive int if left is preferred to right; + * negative if right over left, 0 for equal + */ + public static class Preferred implements Comparator<NodeInstance>, Serializable { + + private static final Comparators.InvertedLongComparator comparator = + new Comparators.InvertedLongComparator(); + private final int role; + + public Preferred(int role) { + this.role = role; + } + + @Override + public int compare(NodeInstance o1, NodeInstance o2) { + NodeEntry left = o1.get(role); + NodeEntry right = o2.get(role); + long ageL = left != null ? left.getLastUsed() : -1; + long ageR = right != null ? right.getLastUsed() : -1; + return comparator.compare(ageL, ageR); + } + } + + /** + * A comparator for sorting entries where the role is newer than + * the other. + * This sort only compares the lastUsed field, not whether the + * node is in use or not + */ + public static class MoreActiveThan implements Comparator<NodeInstance>, + Serializable { + + private final int role; + + public MoreActiveThan(int role) { + this.role = role; + } + + @Override + public int compare(NodeInstance left, NodeInstance right) { + int activeLeft = left.getActiveRoleInstances(role); + int activeRight = right.getActiveRoleInstances(role); + return activeRight - activeLeft; + } + } + /** + * A comparator for sorting entries alphabetically + */ + public static class CompareNames implements Comparator<NodeInstance>, + Serializable { + + public CompareNames() { + } + + @Override + public int compare(NodeInstance left, NodeInstance right) { + return left.hostname.compareTo(right.hostname); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java new file mode 100644 index 0000000..3858b68 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Node map map -and methods to work with it. + * Not Synchronized: caller is expected to lock access. + */ +public class NodeMap extends HashMap<String, NodeInstance> { + protected static final Logger log = + LoggerFactory.getLogger(NodeMap.class); + + /** + * number of roles + */ + private final int roleSize; + + /** + * Construct + * @param roleSize number of roles + */ + public NodeMap(int roleSize) { + this.roleSize = roleSize; + } + + /** + * Get the node instance for the specific node -creating it if needed + * @param hostname node + * @return the instance + */ + public NodeInstance getOrCreate(String hostname) { + NodeInstance node = get(hostname); + if (node == null) { + node = new NodeInstance(hostname, roleSize); + put(hostname, node); + } + return node; + } + + /** + * List the active nodes + * @param role role + * @return a possibly empty sorted list of all nodes that are active + * in that role + */ + public List<NodeInstance> listActiveNodes(int role) { + List<NodeInstance> nodes = new ArrayList<>(); + for (NodeInstance instance : values()) { + if (instance.getActiveRoleInstances(role) > 0) { + nodes.add(instance); + } + } + Collections.sort(nodes, new NodeInstance.MoreActiveThan(role)); + return nodes; + } + + /** + * reset the failed recently counters + */ + public void resetFailedRecently() { + for (Map.Entry<String, NodeInstance> entry : entrySet()) { + NodeInstance ni = entry.getValue(); + ni.resetFailedRecently(); + } + } + + /** + * Update the node state. Return true if the node state changed: either by + * being created, or by changing its internal state as defined + * by {@link NodeInstance#updateNode(NodeReport)}. + * + * @param hostname host name + * @param report latest node report + * @return true if the node state changed enough for a request evaluation. + */ + public boolean updateNode(String hostname, NodeReport report) { + boolean nodeExisted = get(hostname) != null; + boolean updated = getOrCreate(hostname).updateNode(report); + return updated || !nodeExisted; + } + + /** + * Clone point + * @return a shallow clone + */ + @Override + public Object clone() { + return super.clone(); + } + + /** + * Insert a list of nodes into the map; overwrite any with that name + * This is a bulk operation for testing. + * @param nodes collection of nodes. + */ + @VisibleForTesting + public void insert(Collection<NodeInstance> nodes) { + for (NodeInstance node : nodes) { + put(node.hostname, node); + } + } + + /** + * Test helper: build or update a cluster from a list of node reports + * @param reports the list of reports + * @return true if this has been considered to have changed the cluster + */ + @VisibleForTesting + public boolean buildOrUpdate(List<NodeReport> reports) { + boolean updated = false; + for (NodeReport report : reports) { + updated |= getOrCreate(report.getNodeId().getHost()).updateNode(report); + } + return updated; + } + + /** + * Scan the current node map for all nodes capable of hosting an instance + * @param role role ID + * @param label label which must match, or "" for no label checks + * @return a possibly empty list of node instances matching the criteria. + */ + public List<NodeInstance> findAllNodesForRole(int role, String label) { + List<NodeInstance> nodes = new ArrayList<>(size()); + for (NodeInstance instance : values()) { + if (instance.canHost(role, label)) { + nodes.add(instance); + } + } + Collections.sort(nodes, new NodeInstance.CompareNames()); + return nodes; + } + + @Override + public synchronized String toString() { + final StringBuilder sb = new StringBuilder("NodeMap{"); + List<String> keys = new ArrayList<>(keySet()); + Collections.sort(keys); + for (String key : keys) { + sb.append(key).append(": "); + sb.append(get(key).toFullString()).append("\n"); + } + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java new file mode 100644 index 0000000..4357ef8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.server.appmaster.operations.CancelSingleRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tracks an outstanding request. This is used to correlate an allocation response + * with the node and role used in the request. + * <p> + * The node identifier may be null -which indicates that a request was made without + * a specific target node + * <p> + * Equality and the hash code are based <i>only</i> on the role and hostname, + * which are fixed in the constructor. This means that a simple + * instance constructed with (role, hostname) can be used to look up + * a complete request instance in the {@link OutstandingRequestTracker} map + */ +public final class OutstandingRequest extends RoleHostnamePair { + protected static final Logger log = + LoggerFactory.getLogger(OutstandingRequest.class); + + /** + * Node the request is for -may be null + */ + public final NodeInstance node; + + /** + * A list of all possible nodes to list in an AA request. For a non-AA + * request where {@link #node} is set, element 0 of the list is the same + * value. + */ + public final List<NodeInstance> nodes = new ArrayList<>(1); + + /** + * Optional label. This is cached as the request option (explicit-location + label) is forbidden, + * yet the label needs to be retained for escalation. + */ + public String label; + + /** + * Requested time in millis. + * <p> + * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)} + */ + private AMRMClient.ContainerRequest issuedRequest; + + /** + * Requested time in millis. + * <p> + * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)} + */ + private long requestedTimeMillis; + + /** + * Time in millis after which escalation should be triggered.. + * <p> + * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)} + */ + private long escalationTimeoutMillis; + + /** + * Has the placement request been escalated? + */ + private boolean escalated; + + /** + * Flag to indicate that escalation is allowed + */ + private boolean mayEscalate; + + /** + * Priority of request; only valid after the request is built up + */ + private int priority = -1; + + /** + * Is this an Anti-affine request which should be cancelled on + * a cluster resize? + */ + private boolean antiAffine = false; + + /** + * Create a request + * @param roleId role + * @param node node -can be null + */ + public OutstandingRequest(int roleId, + NodeInstance node) { + super(roleId, node != null ? node.hostname : null); + this.node = node; + nodes.add(node); + } + + /** + * Create an outstanding request with the given role and hostname + * Important: this is useful only for map lookups -the other constructor + * with the NodeInstance parameter is needed to generate node-specific + * container requests + * @param roleId role + * @param hostname hostname + */ + public OutstandingRequest(int roleId, String hostname) { + super(roleId, hostname); + this.node = null; + } + + /** + * Create an Anti-affine reques, including all listed nodes (there must be one) + * as targets. + * @param roleId role + * @param nodes list of nodes + */ + public OutstandingRequest(int roleId, List<NodeInstance> nodes) { + super(roleId, nodes.get(0).hostname); + this.node = null; + this.antiAffine = true; + this.nodes.addAll(nodes); + } + + /** + * Is the request located in the cluster, that is: does it have a node. + * @return true if a node instance was supplied in the constructor + */ + public boolean isLocated() { + return node != null; + } + + public long getRequestedTimeMillis() { + return requestedTimeMillis; + } + + public long getEscalationTimeoutMillis() { + return escalationTimeoutMillis; + } + + public synchronized boolean isEscalated() { + return escalated; + } + + public boolean mayEscalate() { + return mayEscalate; + } + + public AMRMClient.ContainerRequest getIssuedRequest() { + return issuedRequest; + } + + public int getPriority() { + return priority; + } + + public boolean isAntiAffine() { + return antiAffine; + } + + public void setAntiAffine(boolean antiAffine) { + this.antiAffine = antiAffine; + } + + /** + * Build a container request. + * <p> + * The value of {@link #node} is used to direct a lot of policy. If null, + * placement is relaxed. + * If not null, the choice of whether to use the suggested node + * is based on the placement policy and failure history. + * <p> + * If the request has an address, it is set in the container request + * (with a flag to enable relaxed priorities). + * <p> + * This operation sets the requested time flag, used for tracking timeouts + * on outstanding requests + * @param resource resource + * @param role role + * @param time time in millis to record as request time + * @return the request to raise + */ + public synchronized AMRMClient.ContainerRequest buildContainerRequest( + Resource resource, RoleStatus role, long time) { + Preconditions.checkArgument(resource != null, "null `resource` arg"); + Preconditions.checkArgument(role != null, "null `role` arg"); + + // cache label for escalation + label = role.getLabelExpression(); + requestedTimeMillis = time; + escalationTimeoutMillis = time + role.getPlacementTimeoutSeconds() * 1000; + String[] hosts; + boolean relaxLocality; + boolean strictPlacement = role.isStrictPlacement(); + NodeInstance target = this.node; + String nodeLabels; + + if (isAntiAffine()) { + int size = nodes.size(); + log.info("Creating anti-affine request across {} nodes; first node = {}", + size, hostname); + hosts = new String[size]; + StringBuilder builder = new StringBuilder(size * 16); + int c = 0; + for (NodeInstance nodeInstance : nodes) { + hosts[c++] = nodeInstance.hostname; + builder.append(nodeInstance.hostname).append(" "); + } + log.debug("Full host list: [ {}]", builder); + escalated = false; + mayEscalate = false; + relaxLocality = false; + nodeLabels = null; + } else if (target != null) { + // placed request. Hostname is used in request + hosts = new String[1]; + hosts[0] = target.hostname; + // and locality flag is set to false; Slider will decide when + // to relax things + relaxLocality = false; + + log.info("Submitting request for container on {}", hosts[0]); + // enable escalation for all but strict placements. + escalated = false; + mayEscalate = !strictPlacement; + nodeLabels = null; + } else { + // no hosts + hosts = null; + // relax locality is mandatory on an unconstrained placement + relaxLocality = true; + // declare that the the placement is implicitly escalated. + escalated = true; + // and forbid it happening + mayEscalate = false; + nodeLabels = label; + } + Priority pri = ContainerPriority.createPriority(roleId, !relaxLocality); + priority = pri.getPriority(); + issuedRequest = new AMRMClient.ContainerRequest(resource, + hosts, + null, + pri, + relaxLocality, + nodeLabels); + validate(); + return issuedRequest; + } + + + /** + * Build an escalated container request, updating {@link #issuedRequest} with + * the new value. + * @return the new container request, which has the same resource and label requirements + * as the original one, and the same host, but: relaxed placement, and a changed priority + * so as to place it into the relaxed list. + */ + public synchronized AMRMClient.ContainerRequest escalate() { + Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued " + this); + log.debug("Escalating {}", this.toString()); + escalated = true; + + // this is now the priority + // it is tagged as unlocated because it needs to go into a different + // set of outstanding requests from the strict placements + Priority pri = ContainerPriority.createPriority(roleId, false); + // update the field + priority = pri.getPriority(); + + String[] nodes; + List<String> issuedRequestNodes = issuedRequest.getNodes(); + if (SliderUtils.isUnset(label) && issuedRequestNodes != null) { + nodes = issuedRequestNodes.toArray(new String[issuedRequestNodes.size()]); + } else { + nodes = null; + } + + issuedRequest = new AMRMClient.ContainerRequest(issuedRequest.getCapability(), + nodes, + null, + pri, + true, + label); + validate(); + return issuedRequest; + } + + /** + * Mark the request as completed (or canceled). + * <p> + * Current action: if a node is defined, its request count is decremented + */ + public void completed() { + if (node != null) { + node.getOrCreate(roleId).requestCompleted(); + } + } + + /** + * Query to see if the request is available and ready to be escalated + * @param time time to check against + * @return true if escalation should begin + */ + public synchronized boolean shouldEscalate(long time) { + return mayEscalate + && !escalated + && issuedRequest != null + && escalationTimeoutMillis < time; + } + + /** + * Query for the resource requirements matching; always false before a request is issued + * @param resource + * @return + */ + public synchronized boolean resourceRequirementsMatch(Resource resource) { + return issuedRequest != null && Resources.fitsIn(issuedRequest.getCapability(), resource); + } + + @Override + public String toString() { + boolean requestHasLocation = ContainerPriority.hasLocation(getPriority()); + final StringBuilder sb = new StringBuilder("OutstandingRequest{"); + sb.append("roleId=").append(roleId); + if (hostname != null) { + sb.append(", hostname='").append(hostname).append('\''); + } + sb.append(", node=").append(node); + sb.append(", hasLocation=").append(requestHasLocation); + sb.append(", label=").append(label); + sb.append(", requestedTimeMillis=").append(requestedTimeMillis); + sb.append(", mayEscalate=").append(mayEscalate); + sb.append(", escalated=").append(escalated); + sb.append(", escalationTimeoutMillis=").append(escalationTimeoutMillis); + sb.append(", issuedRequest=").append( + issuedRequest != null ? SliderUtils.requestToString(issuedRequest) : "(null)"); + sb.append('}'); + return sb.toString(); + } + + /** + * Create a cancel operation + * @return an operation that can be used to cancel the request + */ + public CancelSingleRequest createCancelOperation() { + Preconditions.checkState(issuedRequest != null, "No issued request to cancel"); + return new CancelSingleRequest(issuedRequest); + } + + /** + * Valid if a node label expression specified on container request is valid or + * not. Mimics the logic in AMRMClientImpl, so can be used for preflight checking + * and in mock tests + * + */ + public void validate() throws InvalidContainerRequestException { + Preconditions.checkNotNull(issuedRequest, "request has not yet been built up"); + AMRMClient.ContainerRequest containerRequest = issuedRequest; + String requestDetails = this.toString(); + validateContainerRequest(containerRequest, priority, requestDetails); + } + + /** + * Inner Validation logic for container request + * @param containerRequest request + * @param priority raw priority of role + * @param requestDetails details for error messages + */ + @VisibleForTesting + public static void validateContainerRequest(AMRMClient.ContainerRequest containerRequest, + int priority, String requestDetails) { + String exp = containerRequest.getNodeLabelExpression(); + boolean hasRacks = containerRequest.getRacks() != null && + (!containerRequest.getRacks().isEmpty()); + boolean hasNodes = containerRequest.getNodes() != null && + (!containerRequest.getNodes().isEmpty()); + + boolean hasLabel = SliderUtils.isSet(exp); + + // Don't support specifying >= 2 node labels in a node label expression now + if (hasLabel && (exp.contains("&&") || exp.contains("||"))) { + throw new InvalidContainerRequestException( + "Cannot specify more than two node labels" + + " in a single node label expression: " + requestDetails); + } + + // Don't allow specify node label against ANY request listing hosts or racks + if (hasLabel && ( hasRacks || hasNodes)) { + throw new InvalidContainerRequestException( + "Cannot specify node label with rack or node: " + requestDetails); + } + } + + /** + * Create a new role/hostname pair for indexing. + * @return a new index. + */ + public RoleHostnamePair getIndex() { + return new RoleHostnamePair(roleId, hostname); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java new file mode 100644 index 0000000..c16aa3c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.operations.CancelSingleRequest; +import org.apache.slider.server.appmaster.operations.ContainerRequestOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + +/** + * Tracks outstanding requests made with a specific placement option. + * <p> + * <ol> + * <li>Used to decide when to return a node to 'can request containers here' list</li> + * <li>Used to identify requests where placement has timed out, and so issue relaxed requests</li> + * </ol> + * <p> + * If an allocation comes in that is not in the map: either the allocation + * was unplaced, or the placed allocation could not be met on the specified + * host, and the RM/scheduler fell back to another location. + */ + +public class OutstandingRequestTracker { + protected static final Logger log = + LoggerFactory.getLogger(OutstandingRequestTracker.class); + + /** + * no requests; saves creating a new list if not needed + */ + private final List<AbstractRMOperation> NO_REQUESTS = new ArrayList<>(0); + + private Map<RoleHostnamePair, OutstandingRequest> placedRequests = new HashMap<>(); + + /** + * List of open requests; no specific details on them. + */ + private List<OutstandingRequest> openRequests = new ArrayList<>(); + + /** + * Create a new request for the specific role. + * <p> + * If a location is set, the request is added to {@link #placedRequests}. + * If not, it is added to {@link #openRequests} + * <p> + * This does not update the node instance's role's request count + * @param instance node instance to manager + * @param role role index + * @return a new request + */ + public synchronized OutstandingRequest newRequest(NodeInstance instance, int role) { + OutstandingRequest request = new OutstandingRequest(role, instance); + if (request.isLocated()) { + placedRequests.put(request.getIndex(), request); + } else { + openRequests.add(request); + } + return request; + } + + /** + * Create a new Anti-affine request for the specific role + * <p> + * It is added to {@link #openRequests} + * <p> + * This does not update the node instance's role's request count + * @param role role index + * @param nodes list of suitable nodes + * @param label label to use + * @return a new request + */ + public synchronized OutstandingRequest newAARequest(int role, + List<NodeInstance> nodes, + String label) { + Preconditions.checkArgument(!nodes.isEmpty()); + // safety check to verify the allocation will hold + for (NodeInstance node : nodes) { + Preconditions.checkState(node.canHost(role, label), + "Cannot allocate role ID %d to node %s", role, node); + } + OutstandingRequest request = new OutstandingRequest(role, nodes); + openRequests.add(request); + return request; + } + + /** + * Look up any oustanding request to a (role, hostname). + * @param role role index + * @param hostname hostname + * @return the request or null if there was no outstanding one in the {@link #placedRequests} + */ + @VisibleForTesting + public synchronized OutstandingRequest lookupPlacedRequest(int role, String hostname) { + Preconditions.checkArgument(hostname != null, "null hostname"); + return placedRequests.get(new RoleHostnamePair(role, hostname)); + } + + /** + * Remove a request + * @param request matching request to find + * @return the request or null for no match in the {@link #placedRequests} + */ + @VisibleForTesting + public synchronized OutstandingRequest removePlacedRequest(OutstandingRequest request) { + return placedRequests.remove(request); + } + + /** + * Notification that a container has been allocated + * + * <ol> + * <li>drop it from the {@link #placedRequests} structure.</li> + * <li>generate the cancellation request</li> + * <li>for AA placement, any actions needed</li> + * </ol> + * + * @param role role index + * @param hostname hostname + * @return the allocation outcome + */ + public synchronized ContainerAllocationResults onContainerAllocated(int role, + String hostname, + Container container) { + final String containerDetails = SliderUtils.containerToString(container); + log.debug("Processing allocation for role {} on {}", role, + containerDetails); + ContainerAllocationResults allocation = new ContainerAllocationResults(); + ContainerAllocationOutcome outcome; + OutstandingRequest request = placedRequests.remove(new OutstandingRequest(role, hostname)); + if (request != null) { + //satisfied request + log.debug("Found oustanding placed request for container: {}", request); + request.completed(); + // derive outcome from status of tracked request + outcome = request.isEscalated() + ? ContainerAllocationOutcome.Escalated + : ContainerAllocationOutcome.Placed; + } else { + // not in the list; this is an open placement + // scan through all containers in the open request list + request = removeOpenRequest(container); + if (request != null) { + log.debug("Found open outstanding request for container: {}", request); + request.completed(); + outcome = ContainerAllocationOutcome.Open; + } else { + log.warn("No oustanding request found for container {}, outstanding queue has {} entries ", + containerDetails, + openRequests.size()); + outcome = ContainerAllocationOutcome.Unallocated; + } + } + if (request != null && request.getIssuedRequest() != null) { + allocation.operations.add(request.createCancelOperation()); + } else { + // there's a request, but no idea what to cancel. + // rather than try to recover from it inelegantly, (and cause more confusion), + // log the event, but otherwise continue + log.warn("Unexpected allocation of container " + SliderUtils.containerToString(container)); + } + + allocation.origin = request; + allocation.outcome = outcome; + return allocation; + } + + /** + * Find and remove an open request. Determine it by scanning open requests + * for one whose priority & resource requirements match that of the container + * allocated. + * @param container container allocated + * @return a request which matches the allocation, or null for "no match" + */ + private OutstandingRequest removeOpenRequest(Container container) { + int pri = container.getPriority().getPriority(); + Resource resource = container.getResource(); + OutstandingRequest request = null; + ListIterator<OutstandingRequest> openlist = openRequests.listIterator(); + while (openlist.hasNext() && request == null) { + OutstandingRequest r = openlist.next(); + if (r.getPriority() == pri) { + // matching resource + if (r.resourceRequirementsMatch(resource)) { + // match of priority and resources + request = r; + openlist.remove(); + } else { + log.debug("Matched priorities but resources different"); + } + } + } + return request; + } + + /** + * Determine which host was a role type most recently used on, so that + * if a choice is made of which (potentially surplus) containers to use, + * the most recent one is picked first. This operation <i>does not</i> + * change the role history, though it queries it. + */ + static class newerThan implements Comparator<Container>, Serializable { + private RoleHistory rh; + + public newerThan(RoleHistory rh) { + this.rh = rh; + } + + /** + * Get the age of a node hosting container. If it is not known in the history, + * return 0. + * @param c container + * @return age, null if there's no entry for it. + */ + private long getAgeOf(Container c) { + long age = 0; + NodeInstance node = rh.getExistingNodeInstance(c); + int role = ContainerPriority.extractRole(c); + if (node != null) { + NodeEntry nodeEntry = node.get(role); + if (nodeEntry != null) { + age = nodeEntry.getLastUsed(); + } + } + return age; + } + + /** + * Comparator: which host is more recent? + * @param c1 container 1 + * @param c2 container 2 + * @return 1 if c2 older-than c1, 0 if equal; -1 if c1 older-than c2 + */ + @Override + public int compare(Container c1, Container c2) { + int role1 = ContainerPriority.extractRole(c1); + int role2 = ContainerPriority.extractRole(c2); + if (role1 < role2) return -1; + if (role1 > role2) return 1; + + long age = getAgeOf(c1); + long age2 = getAgeOf(c2); + + if (age > age2) { + return -1; + } else if (age < age2) { + return 1; + } + // equal + return 0; + } + } + + /** + * Take a list of requests and split them into specific host requests and + * generic assignments. This is to give requested hosts priority + * in container assignments if more come back than expected + * @param rh RoleHistory instance + * @param inAllocated the list of allocated containers + * @param outPlaceRequested initially empty list of requested locations + * @param outUnplaced initially empty list of unrequested hosts + */ + public synchronized void partitionRequests(RoleHistory rh, + List<Container> inAllocated, + List<Container> outPlaceRequested, + List<Container> outUnplaced) { + Collections.sort(inAllocated, new newerThan(rh)); + for (Container container : inAllocated) { + int role = ContainerPriority.extractRole(container); + String hostname = RoleHistoryUtils.hostnameOf(container); + if (placedRequests.containsKey(new OutstandingRequest(role, hostname))) { + outPlaceRequested.add(container); + } else { + outUnplaced.add(container); + } + } + } + + + /** + * Reset list all outstanding requests for a role: return the hostnames + * of any canceled requests + * + * @param role role to cancel + * @return possibly empty list of hostnames + */ + public synchronized List<NodeInstance> resetOutstandingRequests(int role) { + List<NodeInstance> hosts = new ArrayList<>(); + Iterator<Map.Entry<RoleHostnamePair, OutstandingRequest>> iterator = + placedRequests.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<RoleHostnamePair, OutstandingRequest> next = + iterator.next(); + OutstandingRequest request = next.getValue(); + if (request.roleId == role) { + iterator.remove(); + request.completed(); + hosts.add(request.node); + } + } + ListIterator<OutstandingRequest> openlist = openRequests.listIterator(); + while (openlist.hasNext()) { + OutstandingRequest next = openlist.next(); + if (next.roleId == role) { + openlist.remove(); + } + } + return hosts; + } + + /** + * Get a list of outstanding requests. The list is cloned, but the contents + * are shared + * @return a list of the current outstanding requests + */ + public synchronized List<OutstandingRequest> listPlacedRequests() { + return new ArrayList<>(placedRequests.values()); + } + + /** + * Get a list of outstanding requests. The list is cloned, but the contents + * are shared + * @return a list of the current outstanding requests + */ + public synchronized List<OutstandingRequest> listOpenRequests() { + return new ArrayList<>(openRequests); + } + + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public synchronized List<AbstractRMOperation> escalateOutstandingRequests(long now) { + if (placedRequests.isEmpty()) { + return NO_REQUESTS; + } + + List<AbstractRMOperation> operations = new ArrayList<>(); + for (OutstandingRequest outstandingRequest : placedRequests.values()) { + synchronized (outstandingRequest) { + // sync escalation check with operation so that nothing can happen to state + // of the request during the escalation + if (outstandingRequest.shouldEscalate(now)) { + + // time to escalate + CancelSingleRequest cancel = outstandingRequest.createCancelOperation(); + operations.add(cancel); + AMRMClient.ContainerRequest escalated = outstandingRequest.escalate(); + operations.add(new ContainerRequestOperation(escalated)); + } + } + + } + return operations; + } + + /** + * Cancel all outstanding AA requests from the lists of requests. + * + * This does not remove them from the role status; they must be reset + * by the caller. + * + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() { + + log.debug("Looking for AA request to cancel"); + List<AbstractRMOperation> operations = new ArrayList<>(); + + // first, all placed requests + List<RoleHostnamePair> requestsToRemove = new ArrayList<>(placedRequests.size()); + for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : placedRequests.entrySet()) { + OutstandingRequest outstandingRequest = entry.getValue(); + synchronized (outstandingRequest) { + if (outstandingRequest.isAntiAffine()) { + // time to escalate + operations.add(outstandingRequest.createCancelOperation()); + requestsToRemove.add(entry.getKey()); + } + } + } + for (RoleHostnamePair keys : requestsToRemove) { + placedRequests.remove(keys); + } + + // second, all open requests + ListIterator<OutstandingRequest> orit = openRequests.listIterator(); + while (orit.hasNext()) { + OutstandingRequest outstandingRequest = orit.next(); + synchronized (outstandingRequest) { + if (outstandingRequest.isAntiAffine()) { + // time to escalate + operations.add(outstandingRequest.createCancelOperation()); + orit.remove(); + } + } + } + log.info("Cancelling {} outstanding AA requests", operations.size()); + + return operations; + } + + /** + * Extract a specific number of open requests for a role + * @param roleId role Id + * @param count count to extract + * @return a list of requests which are no longer in the open request list + */ + public synchronized List<OutstandingRequest> extractOpenRequestsForRole(int roleId, int count) { + List<OutstandingRequest> results = new ArrayList<>(); + ListIterator<OutstandingRequest> openlist = openRequests.listIterator(); + while (openlist.hasNext() && count > 0) { + OutstandingRequest openRequest = openlist.next(); + if (openRequest.roleId == roleId) { + results.add(openRequest); + openlist.remove(); + count--; + } + } + return results; + } + + /** + * Extract a specific number of placed requests for a role + * @param roleId role Id + * @param count count to extract + * @return a list of requests which are no longer in the placed request data structure + */ + public synchronized List<OutstandingRequest> extractPlacedRequestsForRole(int roleId, int count) { + List<OutstandingRequest> results = new ArrayList<>(); + Iterator<Map.Entry<RoleHostnamePair, OutstandingRequest>> + iterator = placedRequests.entrySet().iterator(); + while (iterator.hasNext() && count > 0) { + OutstandingRequest request = iterator.next().getValue(); + if (request.roleId == roleId) { + results.add(request); + count--; + } + } + // now cull them from the map + for (OutstandingRequest result : results) { + placedRequests.remove(result); + } + + return results; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org