YARN-7669. API and interface modifications for placement constraint processor. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d70b0467 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d70b0467 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d70b0467 Branch: refs/heads/YARN-6592 Commit: d70b0467c4e5c5017f81064f71efbb7e7be4f774 Parents: 3967631 Author: Arun Suresh <asur...@apache.org> Authored: Tue Dec 19 22:47:46 2017 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Wed Jan 17 13:49:57 2018 -0800 ---------------------------------------------------------------------- .../yarn/ams/ApplicationMasterServiceUtils.java | 16 + .../api/protocolrecords/AllocateResponse.java | 23 + .../api/records/RejectedSchedulingRequest.java | 70 +++ .../yarn/api/records/RejectionReason.java | 44 ++ .../src/main/proto/yarn_protos.proto | 10 + .../src/main/proto/yarn_service_protos.proto | 1 + .../impl/pb/AllocateResponsePBImpl.java | 85 ++++ .../yarn/api/records/impl/pb/ProtoUtils.java | 16 + .../pb/RejectedSchedulingRequestPBImpl.java | 148 +++++++ .../records/impl/pb/ResourceSizingPBImpl.java | 8 + .../impl/pb/SchedulingRequestPBImpl.java | 11 + .../hadoop/yarn/api/TestPBImplRecords.java | 2 + .../resourcemanager/RMActiveServiceContext.java | 2 +- .../yarn/server/resourcemanager/RMContext.java | 2 +- .../server/resourcemanager/RMContextImpl.java | 2 +- .../server/resourcemanager/ResourceManager.java | 2 +- .../constraint/AllocationTagsManager.java | 431 ------------------- .../constraint/AllocationTagsNamespaces.java | 31 -- .../InvalidAllocationTagsQueryException.java | 35 -- .../constraint/AllocationTagsManager.java | 431 +++++++++++++++++++ .../constraint/AllocationTagsNamespaces.java | 31 ++ .../InvalidAllocationTagsQueryException.java | 35 ++ .../api/ConstraintPlacementAlgorithm.java | 43 ++ .../api/ConstraintPlacementAlgorithmInput.java | 32 ++ .../api/ConstraintPlacementAlgorithmOutput.java | 58 +++ ...traintPlacementAlgorithmOutputCollector.java | 32 ++ .../constraint/api/PlacedSchedulingRequest.java | 79 ++++ .../constraint/api/SchedulingResponse.java | 70 +++ .../scheduler/constraint/api/package-info.java | 28 ++ .../constraint/TestAllocationTagsManager.java | 328 -------------- .../rmcontainer/TestRMContainerImpl.java | 2 +- .../scheduler/capacity/TestUtils.java | 2 +- .../constraint/TestAllocationTagsManager.java | 328 ++++++++++++++ .../scheduler/fifo/TestFifoScheduler.java | 2 +- 34 files changed, 1608 insertions(+), 832 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java index 476da8b..8bdfaf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.ams; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; @@ -86,4 +87,19 @@ public final class ApplicationMasterServiceUtils { } allocateResponse.setAllocatedContainers(allocatedContainers); } + + /** + * Add rejected Scheduling Requests to {@link AllocateResponse}. + * @param allocateResponse Allocate Response. + * @param rejectedRequests Rejected SchedulingRequests. + */ + public static void addToRejectedSchedulingRequests( + AllocateResponse allocateResponse, + List<RejectedSchedulingRequest> rejectedRequests) { + if (allocateResponse.getRejectedSchedulingRequests() != null + && !allocateResponse.getRejectedSchedulingRequests().isEmpty()) { + rejectedRequests.addAll(allocateResponse.getRejectedSchedulingRequests()); + } + allocateResponse.setRejectedSchedulingRequests(rejectedRequests); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 655c6dc..52c30e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; @@ -410,6 +412,27 @@ public abstract class AllocateResponse { public abstract void setContainersFromPreviousAttempts( List<Container> containersFromPreviousAttempt); + /** + * Get a list of all SchedulingRequests that the RM has rejected between + * this allocate call and the previous one. + * @return List of RejectedSchedulingRequests. + */ + @Public + @Unstable + public List<RejectedSchedulingRequest> getRejectedSchedulingRequests() { + return Collections.EMPTY_LIST; + } + + /** + * Add a list of rejected SchedulingRequests to the AllocateResponse. + * @param rejectedRequests List of Rejected Scheduling Requests. + */ + @Private + @Unstable + public void setRejectedSchedulingRequests( + List<RejectedSchedulingRequest> rejectedRequests) { + } + @Private @Unstable public static AllocateResponseBuilder newBuilder() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java new file mode 100644 index 0000000..6e2d95b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.Records; + +/** + * This encapsulates a Rejected SchedulingRequest. It contains the offending + * Scheduling Request along with the reason for rejection. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class RejectedSchedulingRequest { + + /** + * Create new RejectedSchedulingRequest. + * @param reason Rejection Reason. + * @param request Rejected Scheduling Request. + * @return RejectedSchedulingRequest. + */ + public static RejectedSchedulingRequest newInstance(RejectionReason reason, + SchedulingRequest request) { + RejectedSchedulingRequest instance = + Records.newRecord(RejectedSchedulingRequest.class); + instance.setReason(reason); + instance.setRequest(request); + return instance; + } + + /** + * Get Rejection Reason. + * @return Rejection reason. + */ + public abstract RejectionReason getReason(); + + /** + * Set Rejection Reason. + * @param reason Rejection Reason. + */ + public abstract void setReason(RejectionReason reason); + + /** + * Get the Rejected Scheduling Request. + * @return SchedulingRequest. + */ + public abstract SchedulingRequest getRequest(); + + /** + * Set the SchedulingRequest. + * @param request SchedulingRequest. + */ + public abstract void setRequest(SchedulingRequest request); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java new file mode 100644 index 0000000..afbc2ed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Reason for rejecting a Scheduling Request. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public enum RejectionReason { + /** + * This is used to indicate a possible constraint violation. For eg. If the + * App requested anti-affinity across 5 container requests, but only 4 nodes + * exist. Another eg. could be if tag A has affinity with tag B and tag B has + * affinity with tag C, but tag A has anti-affinity with tag C, all at a rack + * scope - and only 1 rack exists. Essentially all situations where the + * Algorithm cannot assign a Node to SchedulingRequest. + */ + COULD_NOT_PLACE_ON_NODE, + /** + * This is used to indicate when after the Algorithm has placed a Scheduling + * Request at a node, but the commit failed because the Queue has no + * capacity etc. This can be a transient situation. + */ + COULD_NOT_SCHEDULE_ON_NODE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index fdc39a7..5cb1177 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -424,6 +424,16 @@ enum AMCommandProto { AM_SHUTDOWN = 2; } +enum RejectionReasonProto { + RRP_COULD_NOT_PLACE_ON_NODE = 1; + RRP_COULD_NOT_SCHEDULE_ON_NODE = 2; +} + +message RejectedSchedulingRequestProto { + required RejectionReasonProto reason = 1; + required SchedulingRequestProto request = 2; +} + message PreemptionMessageProto { optional StrictPreemptionContractProto strictContract = 1; optional PreemptionContractProto contract = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index e49c4e3..92a65ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -120,6 +120,7 @@ message AllocateResponseProto { repeated UpdateContainerErrorProto update_errors = 15; repeated UpdatedContainerProto updated_containers = 16; repeated ContainerProto containers_from_previous_attempts = 17; + repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18; } enum SchedulerResourceTypes { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 5ca1e73..3ab5563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; @@ -47,9 +48,11 @@ import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.api.records.impl.pb.RejectedSchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; @@ -81,6 +84,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { private List<NodeReport> updatedNodes = null; private List<UpdateContainerError> updateErrors = null; + private List<RejectedSchedulingRequest> rejectedRequests = null; private PreemptionMessage preempt; private Token amrmToken = null; private Priority appPriority = null; @@ -140,6 +144,13 @@ public class AllocateResponsePBImpl extends AllocateResponse { getContainerStatusProtoIterable(this.completedContainersStatuses); builder.addAllCompletedContainerStatuses(iterable); } + if (this.rejectedRequests != null) { + builder.clearRejectedSchedulingRequests(); + Iterable<YarnProtos.RejectedSchedulingRequestProto> iterable = + getRejectedSchedulingRequestsProtoIterable( + this.rejectedRequests); + builder.addAllRejectedSchedulingRequests(iterable); + } if (this.updatedNodes != null) { builder.clearUpdatedNodes(); Iterable<NodeReportProto> iterable = @@ -471,6 +482,24 @@ public class AllocateResponsePBImpl extends AllocateResponse { containersFromPreviousAttempts.addAll(containers); } + @Override + public synchronized List<RejectedSchedulingRequest> + getRejectedSchedulingRequests() { + initRejectedRequestsList(); + return this.rejectedRequests; + } + + @Override + public synchronized void setRejectedSchedulingRequests( + List<RejectedSchedulingRequest> rejectedReqs) { + if (rejectedReqs == null) { + return; + } + initRejectedRequestsList(); + this.rejectedRequests.clear(); + this.rejectedRequests.addAll(rejectedReqs); + } + private synchronized void initLocalUpdatedContainerList() { if (this.updatedContainers != null) { return; @@ -528,6 +557,20 @@ public class AllocateResponsePBImpl extends AllocateResponse { } } + private synchronized void initRejectedRequestsList() { + if (this.rejectedRequests != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List<YarnProtos.RejectedSchedulingRequestProto> list = + p.getRejectedSchedulingRequestsList(); + rejectedRequests = new ArrayList<>(); + + for (YarnProtos.RejectedSchedulingRequestProto c : list) { + rejectedRequests.add(convertFromProtoFormat(c)); + } + } + private synchronized void initLocalNewNMTokenList() { if (nmTokens != null) { return; @@ -712,6 +755,38 @@ public class AllocateResponsePBImpl extends AllocateResponse { } }; } + + private synchronized Iterable<YarnProtos.RejectedSchedulingRequestProto> + getRejectedSchedulingRequestsProtoIterable( + final List<RejectedSchedulingRequest> rejectedReqsList) { + maybeInitBuilder(); + return new Iterable<YarnProtos.RejectedSchedulingRequestProto>() { + @Override + public Iterator<YarnProtos.RejectedSchedulingRequestProto> iterator() { + return new Iterator<YarnProtos.RejectedSchedulingRequestProto>() { + + private Iterator<RejectedSchedulingRequest> iter = + rejectedReqsList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized YarnProtos.RejectedSchedulingRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + } + }; + } private synchronized Iterable<NodeReportProto> getNodeReportProtoIterable( @@ -808,6 +883,16 @@ public class AllocateResponsePBImpl extends AllocateResponse { return ((ContainerStatusPBImpl)t).getProto(); } + private synchronized RejectedSchedulingRequestPBImpl convertFromProtoFormat( + YarnProtos.RejectedSchedulingRequestProto p) { + return new RejectedSchedulingRequestPBImpl(p); + } + + private synchronized YarnProtos.RejectedSchedulingRequestProto + convertToProtoFormat(RejectedSchedulingRequest t) { + return ((RejectedSchedulingRequestPBImpl)t).getProto(); + } + private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) { return new ResourcePBImpl(p); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 168d864..76e86ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.RejectionReason; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; @@ -233,6 +234,21 @@ public class ProtoUtils { } /* + * RejectionReason + */ + private static final String REJECTION_REASON_PREFIX = "RRP_"; + public static YarnProtos.RejectionReasonProto convertToProtoFormat( + RejectionReason e) { + return YarnProtos.RejectionReasonProto + .valueOf(REJECTION_REASON_PREFIX + e.name()); + } + public static RejectionReason convertFromProtoFormat( + YarnProtos.RejectionReasonProto e) { + return RejectionReason.valueOf(e.name() + .replace(REJECTION_REASON_PREFIX, "")); + } + + /* * ByteBuffer */ public static ByteBuffer convertFromProtoFormat(ByteString byteString) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java new file mode 100644 index 0000000..ed78551 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.RejectionReason; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.proto.YarnProtos; + +/** + * Implementation of RejectedSchedulingRequest. + */ +public class RejectedSchedulingRequestPBImpl extends RejectedSchedulingRequest { + + private YarnProtos.RejectedSchedulingRequestProto proto = + YarnProtos.RejectedSchedulingRequestProto.getDefaultInstance(); + private YarnProtos.RejectedSchedulingRequestProto.Builder builder = null; + private boolean viaProto = false; + private SchedulingRequest request; + + public RejectedSchedulingRequestPBImpl() { + builder = YarnProtos.RejectedSchedulingRequestProto.newBuilder(); + } + + public RejectedSchedulingRequestPBImpl( + YarnProtos.RejectedSchedulingRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized YarnProtos.RejectedSchedulingRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.request != null) { + builder.setRequest(convertToProtoFormat(this.request)); + } + } + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnProtos.RejectedSchedulingRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized RejectionReason getReason() { + YarnProtos.RejectedSchedulingRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasReason()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getReason()); + } + + @Override + public synchronized void setReason(RejectionReason reason) { + maybeInitBuilder(); + if (reason == null) { + builder.clearReason(); + return; + } + builder.setReason(ProtoUtils.convertToProtoFormat(reason)); + } + + @Override + public synchronized SchedulingRequest getRequest() { + YarnProtos.RejectedSchedulingRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.request != null) { + return this.request; + } + if (!p.hasRequest()) { + return null; + } + this.request = convertFromProtoFormat(p.getRequest()); + return this.request; + } + + @Override + public synchronized void setRequest(SchedulingRequest req) { + maybeInitBuilder(); + if (null == req) { + builder.clearRequest(); + } + this.request = req; + } + + private synchronized YarnProtos.SchedulingRequestProto convertToProtoFormat( + SchedulingRequest r) { + return ((SchedulingRequestPBImpl)r).getProto(); + } + + private synchronized SchedulingRequestPBImpl convertFromProtoFormat( + YarnProtos.SchedulingRequestProto p) { + return new SchedulingRequestPBImpl(p); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java index f98e488..4054837 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java @@ -114,4 +114,12 @@ public class ResourceSizingPBImpl extends ResourceSizing { private ResourceProto convertToProtoFormat(Resource r) { return ProtoUtils.convertToProtoFormat(r); } + + @Override + public String toString() { + return "ResourceSizingPBImpl{" + + "numAllocations=" + getNumAllocations() + + ", resources=" + getResources() + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java index 305856a..1f86043 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java @@ -279,4 +279,15 @@ public class SchedulingRequestPBImpl extends SchedulingRequest { } return false; } + + @Override + public String toString() { + return "SchedulingRequestPBImpl{" + + "priority=" + getPriority() + + ", allocationReqId=" + getAllocationRequestId() + + ", executionType=" + getExecutionType() + + ", allocationTags=" + getAllocationTags() + + ", resourceSizing=" + getResourceSizing() + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index a0b907d..ae80910 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -138,6 +138,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.ReservationAllocationState; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -436,6 +437,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { generateByNewInstance(ResourceTypeInfo.class); generateByNewInstance(ResourceSizing.class); generateByNewInstance(SchedulingRequest.class); + generateByNewInstance(RejectedSchedulingRequest.class); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 6ee3a4c..4d0c230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; -import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 62899d9..00da108 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; -import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 315fdc1..da50ef8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; -import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8d1000e..adda465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Pu import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -94,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java deleted file mode 100644 index b67fab9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * / - */ - -package org.apache.hadoop.yarn.server.resourcemanager.constraint; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.log4j.Logger; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.LongBinaryOperator; - -/** - * Support storing maps between container-tags/applications and - * nodes. This will be required by affinity/anti-affinity implementation and - * cardinality. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class AllocationTagsManager { - - private static final Logger LOG = Logger.getLogger( - AllocationTagsManager.class); - - private ReentrantReadWriteLock.ReadLock readLock; - private ReentrantReadWriteLock.WriteLock writeLock; - - // Application's tags to node - private Map<ApplicationId, NodeToCountedTags> perAppMappings = - new HashMap<>(); - - // Global tags to node mapping (used to fast return aggregated tags - // cardinality across apps) - private NodeToCountedTags globalMapping = new NodeToCountedTags(); - - /** - * Store node to counted tags. - */ - @VisibleForTesting - static class NodeToCountedTags { - // Map<NodeId, Map<Tag, Count>> - private Map<NodeId, Map<String, Long>> nodeToTagsWithCount = - new HashMap<>(); - - // protected by external locks - private void addTagsToNode(NodeId nodeId, Set<String> tags) { - Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, - k -> new HashMap<>()); - - for (String tag : tags) { - Long count = innerMap.get(tag); - if (count == null) { - innerMap.put(tag, 1L); - } else{ - innerMap.put(tag, count + 1); - } - } - } - - // protected by external locks - private void addTagToNode(NodeId nodeId, String tag) { - Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, - k -> new HashMap<>()); - - Long count = innerMap.get(tag); - if (count == null) { - innerMap.put(tag, 1L); - } else{ - innerMap.put(tag, count + 1); - } - } - - private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) { - Long count = innerMap.get(tag); - if (count > 1) { - innerMap.put(tag, count - 1); - } else { - if (count <= 0) { - LOG.warn( - "Trying to remove tags from node, however the count already" - + " becomes 0 or less, it could be a potential bug."); - } - innerMap.remove(tag); - } - } - - private void removeTagsFromNode(NodeId nodeId, Set<String> tags) { - Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); - if (innerMap == null) { - LOG.warn("Failed to find node=" + nodeId - + " while trying to remove tags, please double check."); - return; - } - - for (String tag : tags) { - removeTagFromInnerMap(innerMap, tag); - } - - if (innerMap.isEmpty()) { - nodeToTagsWithCount.remove(nodeId); - } - } - - private void removeTagFromNode(NodeId nodeId, String tag) { - Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); - if (innerMap == null) { - LOG.warn("Failed to find node=" + nodeId - + " while trying to remove tags, please double check."); - return; - } - - removeTagFromInnerMap(innerMap, tag); - - if (innerMap.isEmpty()) { - nodeToTagsWithCount.remove(nodeId); - } - } - - private long getCardinality(NodeId nodeId, String tag) { - Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); - if (innerMap == null) { - return 0; - } - Long value = innerMap.get(tag); - return value == null ? 0 : value; - } - - private long getCardinality(NodeId nodeId, Set<String> tags, - LongBinaryOperator op) { - Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); - if (innerMap == null) { - return 0; - } - - long returnValue = 0; - boolean firstTag = true; - - if (tags != null && !tags.isEmpty()) { - for (String tag : tags) { - Long value = innerMap.get(tag); - if (value == null) { - value = 0L; - } - - if (firstTag) { - returnValue = value; - firstTag = false; - continue; - } - - returnValue = op.applyAsLong(returnValue, value); - } - } else { - // Similar to above if, but only iterate values for better performance - for (long value : innerMap.values()) { - // For the first value, we will not apply op - if (firstTag) { - returnValue = value; - firstTag = false; - continue; - } - returnValue = op.applyAsLong(returnValue, value); - } - } - return returnValue; - } - - private boolean isEmpty() { - return nodeToTagsWithCount.isEmpty(); - } - - @VisibleForTesting - public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() { - return nodeToTagsWithCount; - } - } - - @VisibleForTesting - Map<ApplicationId, NodeToCountedTags> getPerAppMappings() { - return perAppMappings; - } - - @VisibleForTesting - NodeToCountedTags getGlobalMapping() { - return globalMapping; - } - - public AllocationTagsManager() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - } - - /** - * Notify container allocated on a node. - * - * @param nodeId allocated node. - * @param applicationId applicationId - * @param containerId container id. - * @param allocationTags allocation tags, see - * {@link SchedulingRequest#getAllocationTags()} - * application_id will be added to allocationTags. - */ - public void addContainer(NodeId nodeId, ApplicationId applicationId, - ContainerId containerId, Set<String> allocationTags) { - String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); - - boolean useSet = false; - if (allocationTags != null && !allocationTags.isEmpty()) { - // Copy before edit it. - allocationTags = new HashSet<>(allocationTags); - allocationTags.add(applicationIdTag); - useSet = true; - } - - writeLock.lock(); - try { - NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent( - applicationId, k -> new NodeToCountedTags()); - - if (useSet) { - perAppTagsMapping.addTagsToNode(nodeId, allocationTags); - globalMapping.addTagsToNode(nodeId, allocationTags); - } else { - perAppTagsMapping.addTagToNode(nodeId, applicationIdTag); - globalMapping.addTagToNode(nodeId, applicationIdTag); - } - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Added container=" + containerId + " with tags=[" + StringUtils - .join(allocationTags, ",") + "]"); - } - } finally { - writeLock.unlock(); - } - } - - /** - * Notify container removed. - * - * @param nodeId nodeId - * @param applicationId applicationId - * @param containerId containerId. - * @param allocationTags allocation tags for given container - */ - public void removeContainer(NodeId nodeId, ApplicationId applicationId, - ContainerId containerId, Set<String> allocationTags) { - String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); - boolean useSet = false; - - if (allocationTags != null && !allocationTags.isEmpty()) { - // Copy before edit it. - allocationTags = new HashSet<>(allocationTags); - allocationTags.add(applicationIdTag); - useSet = true; - } - - writeLock.lock(); - try { - NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId); - if (perAppTagsMapping == null) { - return; - } - - if (useSet) { - perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags); - globalMapping.removeTagsFromNode(nodeId, allocationTags); - } else { - perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag); - globalMapping.removeTagFromNode(nodeId, applicationIdTag); - } - - if (perAppTagsMapping.isEmpty()) { - perAppMappings.remove(applicationId); - } - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Removed container=" + containerId + " with tags=[" + StringUtils - .join(allocationTags, ",") + "]"); - } - } finally { - writeLock.unlock(); - } - } - - /** - * Get cardinality for following conditions. External can pass-in a binary op - * to implement customized logic. * - * @param nodeId nodeId, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all nodes. - * @param tag allocation tag, see - * {@link SchedulingRequest#getAllocationTags()}, - * When multiple tags specified. Returns cardinality - * depends on op. If a specified tag doesn't exist, - * 0 will be its cardinality. - * When null/empty tags specified, all tags - * (of the node/app) will be considered. - * @return cardinality of specified query on the node. - * @throws InvalidAllocationTagsQueryException when illegal query - * parameter specified - */ - public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId, - String tag) throws InvalidAllocationTagsQueryException { - readLock.lock(); - - try { - if (nodeId == null) { - throw new InvalidAllocationTagsQueryException( - "Must specify nodeId/tags/op to query cardinality"); - } - - NodeToCountedTags mapping; - if (applicationId != null) { - mapping = perAppMappings.get(applicationId); - } else{ - mapping = globalMapping; - } - - if (mapping == null) { - return 0; - } - - return mapping.getCardinality(nodeId, tag); - } finally { - readLock.unlock(); - } - } - - /** - * Check if given tag exists on node. - * - * @param nodeId nodeId, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all nodes. - * @param tag allocation tag, see - * {@link SchedulingRequest#getAllocationTags()}, - * When multiple tags specified. Returns cardinality - * depends on op. If a specified tag doesn't exist, - * 0 will be its cardinality. - * When null/empty tags specified, all tags - * (of the node/app) will be considered. - * @return cardinality of specified query on the node. - * @throws InvalidAllocationTagsQueryException when illegal query - * parameter specified - */ - public boolean allocationTagExistsOnNode(NodeId nodeId, - ApplicationId applicationId, String tag) - throws InvalidAllocationTagsQueryException { - return getNodeCardinality(nodeId, applicationId, tag) > 0; - } - - /** - * Get cardinality for following conditions. External can pass-in a binary op - * to implement customized logic. - * - * @param nodeId nodeId, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all nodes. - * @param tags allocation tags, see - * {@link SchedulingRequest#getAllocationTags()}, - * When multiple tags specified. Returns cardinality - * depends on op. If a specified tag doesn't exist, 0 - * will be its cardinality. When null/empty tags - * specified, all tags (of the node/app) will be - * considered. - * @param op operator. Such as Long::max, Long::sum, etc. Required. - * This sparameter only take effect when #values >= 2. - * @return cardinality of specified query on the node. - * @throws InvalidAllocationTagsQueryException when illegal query - * parameter specified - */ - public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, - Set<String> tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { - readLock.lock(); - - try { - if (nodeId == null || op == null) { - throw new InvalidAllocationTagsQueryException( - "Must specify nodeId/tags/op to query cardinality"); - } - - NodeToCountedTags mapping; - if (applicationId != null) { - mapping = perAppMappings.get(applicationId); - } else{ - mapping = globalMapping; - } - - if (mapping == null) { - return 0; - } - - return mapping.getCardinality(nodeId, tags, op); - } finally { - readLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java deleted file mode 100644 index 893ff1c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * / - */ - -package org.apache.hadoop.yarn.server.resourcemanager.constraint; - -/** - * Predefined namespaces for tags - * - * Same as namespace of resource types. Namespaces of placement tags are start - * with alphabets and ended with "/" - */ -public class AllocationTagsNamespaces { - public static final String APP_ID = "yarn_app_id/"; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java deleted file mode 100644 index 5519e39..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * / - */ - -package org.apache.hadoop.yarn.server.resourcemanager.constraint; - -import org.apache.hadoop.yarn.exceptions.YarnException; - -/** - * Exception when invalid parameter specified to do placement tags related - * queries. - */ -public class InvalidAllocationTagsQueryException extends YarnException { - private static final long serialVersionUID = 12312831974894L; - - public InvalidAllocationTagsQueryException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java new file mode 100644 index 0000000..c278606 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -0,0 +1,431 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongBinaryOperator; + +/** + * Support storing maps between container-tags/applications and + * nodes. This will be required by affinity/anti-affinity implementation and + * cardinality. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class AllocationTagsManager { + + private static final Logger LOG = Logger.getLogger( + AllocationTagsManager.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + // Application's tags to node + private Map<ApplicationId, NodeToCountedTags> perAppMappings = + new HashMap<>(); + + // Global tags to node mapping (used to fast return aggregated tags + // cardinality across apps) + private NodeToCountedTags globalMapping = new NodeToCountedTags(); + + /** + * Store node to counted tags. + */ + @VisibleForTesting + static class NodeToCountedTags { + // Map<NodeId, Map<Tag, Count>> + private Map<NodeId, Map<String, Long>> nodeToTagsWithCount = + new HashMap<>(); + + // protected by external locks + private void addTagsToNode(NodeId nodeId, Set<String> tags) { + Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, + k -> new HashMap<>()); + + for (String tag : tags) { + Long count = innerMap.get(tag); + if (count == null) { + innerMap.put(tag, 1L); + } else{ + innerMap.put(tag, count + 1); + } + } + } + + // protected by external locks + private void addTagToNode(NodeId nodeId, String tag) { + Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, + k -> new HashMap<>()); + + Long count = innerMap.get(tag); + if (count == null) { + innerMap.put(tag, 1L); + } else{ + innerMap.put(tag, count + 1); + } + } + + private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) { + Long count = innerMap.get(tag); + if (count > 1) { + innerMap.put(tag, count - 1); + } else { + if (count <= 0) { + LOG.warn( + "Trying to remove tags from node, however the count already" + + " becomes 0 or less, it could be a potential bug."); + } + innerMap.remove(tag); + } + } + + private void removeTagsFromNode(NodeId nodeId, Set<String> tags) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + LOG.warn("Failed to find node=" + nodeId + + " while trying to remove tags, please double check."); + return; + } + + for (String tag : tags) { + removeTagFromInnerMap(innerMap, tag); + } + + if (innerMap.isEmpty()) { + nodeToTagsWithCount.remove(nodeId); + } + } + + private void removeTagFromNode(NodeId nodeId, String tag) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + LOG.warn("Failed to find node=" + nodeId + + " while trying to remove tags, please double check."); + return; + } + + removeTagFromInnerMap(innerMap, tag); + + if (innerMap.isEmpty()) { + nodeToTagsWithCount.remove(nodeId); + } + } + + private long getCardinality(NodeId nodeId, String tag) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return 0; + } + Long value = innerMap.get(tag); + return value == null ? 0 : value; + } + + private long getCardinality(NodeId nodeId, Set<String> tags, + LongBinaryOperator op) { + Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return 0; + } + + long returnValue = 0; + boolean firstTag = true; + + if (tags != null && !tags.isEmpty()) { + for (String tag : tags) { + Long value = innerMap.get(tag); + if (value == null) { + value = 0L; + } + + if (firstTag) { + returnValue = value; + firstTag = false; + continue; + } + + returnValue = op.applyAsLong(returnValue, value); + } + } else { + // Similar to above if, but only iterate values for better performance + for (long value : innerMap.values()) { + // For the first value, we will not apply op + if (firstTag) { + returnValue = value; + firstTag = false; + continue; + } + returnValue = op.applyAsLong(returnValue, value); + } + } + return returnValue; + } + + private boolean isEmpty() { + return nodeToTagsWithCount.isEmpty(); + } + + @VisibleForTesting + public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() { + return nodeToTagsWithCount; + } + } + + @VisibleForTesting + Map<ApplicationId, NodeToCountedTags> getPerAppMappings() { + return perAppMappings; + } + + @VisibleForTesting + NodeToCountedTags getGlobalMapping() { + return globalMapping; + } + + public AllocationTagsManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + /** + * Notify container allocated on a node. + * + * @param nodeId allocated node. + * @param applicationId applicationId + * @param containerId container id. + * @param allocationTags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()} + * application_id will be added to allocationTags. + */ + public void addContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set<String> allocationTags) { + String applicationIdTag = + AllocationTagsNamespaces.APP_ID + applicationId.toString(); + + boolean useSet = false; + if (allocationTags != null && !allocationTags.isEmpty()) { + // Copy before edit it. + allocationTags = new HashSet<>(allocationTags); + allocationTags.add(applicationIdTag); + useSet = true; + } + + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent( + applicationId, k -> new NodeToCountedTags()); + + if (useSet) { + perAppTagsMapping.addTagsToNode(nodeId, allocationTags); + globalMapping.addTagsToNode(nodeId, allocationTags); + } else { + perAppTagsMapping.addTagToNode(nodeId, applicationIdTag); + globalMapping.addTagToNode(nodeId, applicationIdTag); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Added container=" + containerId + " with tags=[" + StringUtils + .join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Notify container removed. + * + * @param nodeId nodeId + * @param applicationId applicationId + * @param containerId containerId. + * @param allocationTags allocation tags for given container + */ + public void removeContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set<String> allocationTags) { + String applicationIdTag = + AllocationTagsNamespaces.APP_ID + applicationId.toString(); + boolean useSet = false; + + if (allocationTags != null && !allocationTags.isEmpty()) { + // Copy before edit it. + allocationTags = new HashSet<>(allocationTags); + allocationTags.add(applicationIdTag); + useSet = true; + } + + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId); + if (perAppTagsMapping == null) { + return; + } + + if (useSet) { + perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags); + globalMapping.removeTagsFromNode(nodeId, allocationTags); + } else { + perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag); + globalMapping.removeTagFromNode(nodeId, applicationIdTag); + } + + if (perAppTagsMapping.isEmpty()) { + perAppMappings.remove(applicationId); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Removed container=" + containerId + " with tags=[" + StringUtils + .join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Get cardinality for following conditions. External can pass-in a binary op + * to implement customized logic. * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tag allocation tag, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, + * 0 will be its cardinality. + * When null/empty tags specified, all tags + * (of the node/app) will be considered. + * @return cardinality of specified query on the node. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + readLock.lock(); + + try { + if (nodeId == null) { + throw new InvalidAllocationTagsQueryException( + "Must specify nodeId/tags/op to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppMappings.get(applicationId); + } else{ + mapping = globalMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(nodeId, tag); + } finally { + readLock.unlock(); + } + } + + /** + * Check if given tag exists on node. + * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tag allocation tag, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, + * 0 will be its cardinality. + * When null/empty tags specified, all tags + * (of the node/app) will be considered. + * @return cardinality of specified query on the node. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public boolean allocationTagExistsOnNode(NodeId nodeId, + ApplicationId applicationId, String tag) + throws InvalidAllocationTagsQueryException { + return getNodeCardinality(nodeId, applicationId, tag) > 0; + } + + /** + * Get cardinality for following conditions. External can pass-in a binary op + * to implement customized logic. + * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality + * depends on op. If a specified tag doesn't exist, 0 + * will be its cardinality. When null/empty tags + * specified, all tags (of the node/app) will be + * considered. + * @param op operator. Such as Long::max, Long::sum, etc. Required. + * This sparameter only take effect when #values >= 2. + * @return cardinality of specified query on the node. + * @throws InvalidAllocationTagsQueryException when illegal query + * parameter specified + */ + public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, + Set<String> tags, LongBinaryOperator op) + throws InvalidAllocationTagsQueryException { + readLock.lock(); + + try { + if (nodeId == null || op == null) { + throw new InvalidAllocationTagsQueryException( + "Must specify nodeId/tags/op to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppMappings.get(applicationId); + } else{ + mapping = globalMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(nodeId, tags, op); + } finally { + readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java new file mode 100644 index 0000000..43fcfe5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java @@ -0,0 +1,31 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +/** + * Predefined namespaces for tags + * + * Same as namespace of resource types. Namespaces of placement tags are start + * with alphabets and ended with "/" + */ +public class AllocationTagsNamespaces { + public static final String APP_ID = "yarn_app_id/"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java new file mode 100644 index 0000000..29483a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java @@ -0,0 +1,35 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception when invalid parameter specified to do placement tags related + * queries. + */ +public class InvalidAllocationTagsQueryException extends YarnException { + private static final long serialVersionUID = 12312831974894L; + + public InvalidAllocationTagsQueryException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java new file mode 100644 index 0000000..2651663 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +/** + * Marker interface for a Constraint Placement. The only contract is that it + * should be initialized with the RMContext. + */ +public interface ConstraintPlacementAlgorithm { + + /** + * Initialize the Algorithm. + * @param rmContext RMContext. + */ + void init(RMContext rmContext); + + /** + * The Algorithm is expected to compute the placement of the provided + * ConstraintPlacementAlgorithmInput and use the collector to aggregate + * any output. + * @param algorithmInput Input to the Algorithm. + * @param collector Collector for output of algorithm. + */ + void place(ConstraintPlacementAlgorithmInput algorithmInput, + ConstraintPlacementAlgorithmOutputCollector collector); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d70b0467/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java new file mode 100644 index 0000000..74572b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +import java.util.Collection; + +/** + * This encapsulates an input to the Constraint Placement Algorithm. At the + * very least it must consist of a collection of SchedulerRequests. + */ +public interface ConstraintPlacementAlgorithmInput { + + Collection<SchedulingRequest> getSchedulingRequests(); + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org