Repository: hadoop Updated Branches: refs/heads/trunk b6e50fad5 -> 50723889c
YARN-7778. Merging of placement constraints defined at different levels. Contributed by Weiwei Yang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/50723889 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/50723889 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/50723889 Branch: refs/heads/trunk Commit: 50723889cc29e8dadfa6ab6afbb90ac798d66878 Parents: b6e50fa Author: Konstantinos Karanasos <kkarana...@apache.org> Authored: Fri Feb 2 14:43:54 2018 -0800 Committer: Konstantinos Karanasos <kkarana...@apache.org> Committed: Fri Feb 2 14:46:20 2018 -0800 ---------------------------------------------------------------------- .../MemoryPlacementConstraintManager.java | 42 ++++++++++ .../constraint/PlacementConstraintManager.java | 13 ++++ .../constraint/PlacementConstraintsUtil.java | 24 ++---- .../TestPlacementConstraintManagerService.java | 82 ++++++++++++++++++++ ...stSingleConstraintAppPlacementAllocator.java | 5 ++ 5 files changed, 150 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/50723889/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java index ceff6f6..5cb8b99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.List; +import java.util.ArrayList; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -33,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,6 +240,45 @@ public class MemoryPlacementConstraintManager } @Override + public PlacementConstraint getMultilevelConstraint(ApplicationId appId, + Set<String> sourceTags, PlacementConstraint schedulingRequestConstraint) { + List<PlacementConstraint> constraints = new ArrayList<>(); + // Add scheduling request-level constraint. + if (schedulingRequestConstraint != null) { + constraints.add(schedulingRequestConstraint); + } + // Add app-level constraint if appId is given. + if (appId != null && sourceTags != null + && !sourceTags.isEmpty()) { + constraints.add(getConstraint(appId, sourceTags)); + } + // Add global constraint. + if (sourceTags != null && !sourceTags.isEmpty()) { + constraints.add(getGlobalConstraint(sourceTags)); + } + + // Remove all null or duplicate constraints. + List<PlacementConstraint.AbstractConstraint> allConstraints = + constraints.stream() + .filter(placementConstraint -> placementConstraint != null + && placementConstraint.getConstraintExpr() != null) + .map(PlacementConstraint::getConstraintExpr) + .distinct() + .collect(Collectors.toList()); + + // Compose an AND constraint + // When merge request(RC), app(AC) and global constraint(GC), + // we do a merge on them with CC=AND(GC, AC, RC) and returns a + // composite AND constraint. Subsequently we check if CC could + // be satisfied. This ensures that every level of constraint + // is satisfied. + PlacementConstraint.And andConstraint = PlacementConstraints.and( + allConstraints.toArray(new PlacementConstraint + .AbstractConstraint[allConstraints.size()])); + return andConstraint.build(); + } + + @Override public void unregisterApplication(ApplicationId appId) { try { writeLock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/50723889/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java index 7725d0d..bf2365c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java @@ -105,6 +105,19 @@ public interface PlacementConstraintManager { PlacementConstraint getGlobalConstraint(Set<String> sourceTags); /** + * Consider all levels of constraints (scheduling request, app, cluster) and + * return a merged constraint. + * + * @param applicationId application ID + * @param sourceTags a set of source allocation tags + * @param schedulingRequestConstraint placement constraint at scheduling + * request level + * @return a merged placement constraint + */ + PlacementConstraint getMultilevelConstraint(ApplicationId applicationId, + Set<String> sourceTags, PlacementConstraint schedulingRequestConstraint); + + /** * Remove the constraints that correspond to a given application. * * @param appId the application that will be removed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/50723889/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 6396e57..ab0bbd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -248,22 +248,14 @@ public final class PlacementConstraintsUtil { SchedulingRequest request, SchedulerNode schedulerNode, PlacementConstraintManager pcm, AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { - // TODO do proper merge on different level of constraints, see YARN-7778. - - // Request level constraint - PlacementConstraint constraint = request.getPlacementConstraint(); - if (constraint == null) { - // Application level constraint - constraint = pcm.getConstraint(applicationId, - request.getAllocationTags()); - if (constraint == null) { - // Global level constraint - constraint = pcm.getGlobalConstraint(request.getAllocationTags()); - if (constraint == null) { - return true; - } - } + Set<String> sourceTags = null; + PlacementConstraint pc = null; + if (request != null) { + sourceTags = request.getAllocationTags(); + pc = request.getPlacementConstraint(); } - return canSatisfyConstraints(applicationId, constraint, schedulerNode, atm); + return canSatisfyConstraints(applicationId, + pcm.getMultilevelConstraint(applicationId, sourceTags, pc), + schedulerNode, atm); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50723889/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java index abcab1a..976906d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java @@ -34,8 +34,11 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -179,4 +182,83 @@ public class TestPlacementConstraintManagerService { Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1)); Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1)); } + + @Test + public void testGetRequestConstraint() { + // Request Constraint(RC), App Constraint(AC), Global Constraint(GC) + PlacementConstraint constraint; + And mergedConstraint; + SchedulingRequest request; + + // RC = c1 + // AC = null + // GC = null + constraint = pcm.getMultilevelConstraint(appId1, null, c1); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + + // RC = null + // AC = tag1->c1, tag2->c2 + // GC = null + pcm.registerApplication(appId1, constraintMap1); + // if the source tag in the request is not mapped to any existing + // registered constraint, we should get an empty AND constraint. + constraint = pcm.getMultilevelConstraint(appId1, + Sets.newHashSet("not_exist_tag"), null); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND() + Assert.assertEquals(0, mergedConstraint.getChildren().size()); + // if a mapping is found for a given source tag + constraint = pcm.getMultilevelConstraint(appId1, sourceTag1, null); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1) + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + pcm.unregisterApplication(appId1); + + // RC = null + // AC = null + // GC = tag1->c1 + pcm.addGlobalConstraint(sourceTag1, c1, true); + constraint = pcm.getMultilevelConstraint(appId1, + Sets.newHashSet(sourceTag1), null); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1) + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + pcm.removeGlobalConstraint(sourceTag1); + + // RC = c2 + // AC = tag1->c1, tag2->c2 + // GC = tag1->c3 + pcm.addGlobalConstraint(sourceTag1, c3, true); + pcm.registerApplication(appId1, constraintMap1); + // both RC, AC and GC should be respected + constraint = pcm.getMultilevelConstraint(appId1, sourceTag1, c2); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1, c2, c3) + Assert.assertEquals(3, mergedConstraint.getChildren().size()); + pcm.removeGlobalConstraint(sourceTag1); + pcm.unregisterApplication(appId1); + + // RC = c1 + // AC = tag1->c1, tag2->c2 + // GC = tag1->c2 + pcm.addGlobalConstraint(sourceTag1, c2, true); + pcm.registerApplication(appId1, constraintMap1); + constraint = pcm.getMultilevelConstraint(appId1, + Sets.newHashSet(sourceTag1), c1); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1, c2) + Assert.assertEquals(2, mergedConstraint.getChildren().size()); + pcm.removeGlobalConstraint(sourceTag1); + pcm.unregisterApplication(appId1); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50723889/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java index 3485ea8..9be56ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.junit.Assert; import org.junit.Before; @@ -76,10 +78,13 @@ public class TestSingleConstraintAppPlacementAllocator { // Create allocation tags manager AllocationTagsManager allocationTagsManager = new AllocationTagsManager( rmContext); + PlacementConstraintManager placementConstraintManager = + new MemoryPlacementConstraintManager(); spyAllocationTagsManager = spy(allocationTagsManager); schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L, TestUtils.getMockContainerId(1, 1)); rmContext.setAllocationTagsManager(spyAllocationTagsManager); + rmContext.setPlacementConstraintManager(placementConstraintManager); // Create allocator allocator = new SingleConstraintAppPlacementAllocator(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org