YARN-7448. [API] Add SchedulingRequest to the AllocateRequest. (Panagiotis Garefalakis via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69de9a1b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69de9a1b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69de9a1b Branch: refs/heads/trunk Commit: 69de9a1ba9a587c7e03ae7c7aeae93e04c36d665 Parents: db92855 Author: Arun Suresh <asur...@apache.org> Authored: Fri Nov 17 10:42:43 2017 -0800 Committer: Arun Suresh <asur...@apache.org> Committed: Wed Jan 31 01:30:17 2018 -0800 ---------------------------------------------------------------------- .../api/protocolrecords/AllocateRequest.java | 42 ++++++++++ .../hadoop/yarn/api/records/ResourceSizing.java | 27 +++++++ .../yarn/api/records/SchedulingRequest.java | 1 + .../src/main/proto/yarn_service_protos.proto | 1 + .../impl/pb/AllocateRequestPBImpl.java | 83 ++++++++++++++++++++ .../records/impl/pb/ResourceSizingPBImpl.java | 2 +- .../impl/pb/SchedulingRequestPBImpl.java | 16 ++++ .../hadoop/yarn/api/TestPBImplRecords.java | 19 +++++ 8 files changed, 190 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index ae0891e..d8d2347 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.Collections; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.util.Records; @@ -212,6 +214,32 @@ public abstract class AllocateRequest { public abstract void setUpdateRequests( List<UpdateContainerRequest> updateRequests); + /** + * Get the list of Scheduling requests being sent by the + * <code>ApplicationMaster</code>. + * @return list of {@link SchedulingRequest} being sent by the + * <code>ApplicationMaster</code>. + */ + @Public + @Unstable + public List<SchedulingRequest> getSchedulingRequests() { + return Collections.EMPTY_LIST; + } + + /** + * Set the list of Scheduling requests to inform the + * <code>ResourceManager</code> about the application's resource requirements + * (potentially including allocation tags & placement constraints). + * @param schedulingRequests list of <code>SchedulingRequest</code> to update + * the <code>ResourceManager</code> about the application's resource + * requirements. + */ + @Public + @Unstable + public void setSchedulingRequests( + List<SchedulingRequest> schedulingRequests) { + } + @Public @Unstable public static AllocateRequestBuilder newBuilder() { @@ -314,6 +342,20 @@ public abstract class AllocateRequest { } /** + * Set the <code>schedulingRequests</code> of the request. + * @see AllocateRequest#setSchedulingRequests(List) + * @param schedulingRequests <code>SchedulingRequest</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder schedulingRequests( + List<SchedulingRequest> schedulingRequests) { + allocateRequest.setSchedulingRequests(schedulingRequests); + return this; + } + + /** * Return generated {@link AllocateRequest} object. * @return {@link AllocateRequest} */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java index d82be11..8cdc63f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java @@ -61,4 +61,31 @@ public abstract class ResourceSizing { @Public @Unstable public abstract void setResources(Resource resources); + + @Override + public int hashCode() { + int result = getResources().hashCode(); + result = 31 * result + getNumAllocations(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if(obj == null || getClass() != obj.getClass()) { + return false; + } + + ResourceSizing that = (ResourceSizing) obj; + + if(getNumAllocations() != that.getNumAllocations()) { + return false; + } + if(!getResources().equals(that.getResources())) { + return false; + } + return true; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java index 47a0697..e32dd24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java @@ -49,6 +49,7 @@ public abstract class SchedulingRequest { return SchedulingRequest.newBuilder() .allocationRequestId(allocationRequestId).priority(priority) .executionType(executionType).allocationTags(allocationTags) + .resourceSizing(resourceSizing) .placementConstraintExpression(placementConstraintExpression).build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 68e585d..e49c4e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -91,6 +91,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated UpdateContainerRequestProto update_requests = 7; + repeated SchedulingRequestProto scheduling_requests = 10; } message NMTokenProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index 0f0f571..b460044 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -29,14 +29,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProtoOrBuilder; @@ -53,6 +56,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List<ResourceRequest> ask = null; private List<ContainerId> release = null; private List<UpdateContainerRequest> updateRequests = null; + private List<SchedulingRequest> schedulingRequests = null; private ResourceBlacklistRequest blacklistRequest = null; public AllocateRequestPBImpl() { @@ -101,6 +105,9 @@ public class AllocateRequestPBImpl extends AllocateRequest { if (this.updateRequests != null) { addUpdateRequestsToProto(); } + if (this.schedulingRequests != null) { + addSchedulingRequestsToProto(); + } if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } @@ -178,6 +185,23 @@ public class AllocateRequestPBImpl extends AllocateRequest { } @Override + public List<SchedulingRequest> getSchedulingRequests() { + initSchedulingRequests(); + return this.schedulingRequests; + } + + @Override + public void setSchedulingRequests( + List<SchedulingRequest> schedulingRequests) { + if (schedulingRequests == null) { + return; + } + initSchedulingRequests(); + this.schedulingRequests.clear(); + this.schedulingRequests.addAll(schedulingRequests); + } + + @Override public ResourceBlacklistRequest getResourceBlacklistRequest() { AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; if (this.blacklistRequest != null) { @@ -261,6 +285,20 @@ public class AllocateRequestPBImpl extends AllocateRequest { } } + private void initSchedulingRequests() { + if (this.schedulingRequests != null) { + return; + } + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + List<SchedulingRequestProto> list = + p.getSchedulingRequestsList(); + this.schedulingRequests = new ArrayList<>(); + + for (SchedulingRequestProto c : list) { + this.schedulingRequests.add(convertFromProtoFormat(c)); + } + } + private void addUpdateRequestsToProto() { maybeInitBuilder(); builder.clearUpdateRequests(); @@ -297,6 +335,41 @@ public class AllocateRequestPBImpl extends AllocateRequest { builder.addAllUpdateRequests(iterable); } + private void addSchedulingRequestsToProto() { + maybeInitBuilder(); + builder.clearSchedulingRequests(); + if (schedulingRequests == null) { + return; + } + Iterable<SchedulingRequestProto> iterable = + new Iterable<SchedulingRequestProto>() { + @Override + public Iterator<SchedulingRequestProto> iterator() { + return new Iterator<SchedulingRequestProto>() { + + private Iterator<SchedulingRequest> iter = + schedulingRequests.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SchedulingRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + builder.addAllSchedulingRequests(iterable); + } @Override public List<ContainerId> getReleaseList() { initReleases(); @@ -377,6 +450,16 @@ public class AllocateRequestPBImpl extends AllocateRequest { return ((UpdateContainerRequestPBImpl) t).getProto(); } + private SchedulingRequestPBImpl convertFromProtoFormat( + SchedulingRequestProto p) { + return new SchedulingRequestPBImpl(p); + } + + private SchedulingRequestProto convertToProtoFormat( + SchedulingRequest t) { + return ((SchedulingRequestPBImpl) t).getProto(); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java index 05bb3bd..f98e488 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java @@ -112,6 +112,6 @@ public class ResourceSizingPBImpl extends ResourceSizing { } private ResourceProto convertToProtoFormat(Resource r) { - return ((ResourcePBImpl) r).getProto(); + return ProtoUtils.convertToProtoFormat(r); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java index 7826b36..305856a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java @@ -263,4 +263,20 @@ public class SchedulingRequestPBImpl extends SchedulingRequest { this.allocationTags = new HashSet<>(); this.allocationTags.addAll(p.getAllocationTagsList()); } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69de9a1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c5585c2..a0b907d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -149,8 +149,10 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.Token; @@ -189,7 +191,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceSizingPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceTypeInfoPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; @@ -225,6 +229,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.StrictPreemptionContractProto; import org.apache.hadoop.yarn.proto.YarnProtos.URLProto; @@ -428,6 +434,8 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { generateByNewInstance(QueueConfigurations.class); generateByNewInstance(CollectorInfo.class); generateByNewInstance(ResourceTypeInfo.class); + generateByNewInstance(ResourceSizing.class); + generateByNewInstance(SchedulingRequest.class); } @Test @@ -907,6 +915,17 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { } @Test + public void testResourceSizingPBImpl() throws Exception { + validatePBImplRecord(ResourceSizingPBImpl.class, ResourceSizingProto.class); + } + + @Test + public void testSchedulingRequestPBImpl() throws Exception { + validatePBImplRecord(SchedulingRequestPBImpl.class, + SchedulingRequestProto.class); + } + + @Test public void testSerializedExceptionPBImpl() throws Exception { validatePBImplRecord(SerializedExceptionPBImpl.class, SerializedExceptionProto.class); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org