Author: bikas
Date: Tue Jul 16 00:28:29 2013
New Revision: 1503527
URL: http://svn.apache.org/r1503527
Log:
Merge r1503526 from trunk to branch-2 for YARN-521. Augment AM - RM client
module to be able to request containers only at specific locations (Sandy Ryza
via bikas)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java
- copied unchanged from r1503526,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/InvalidContainerRequestException.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1503527&r1=1503526&r2=1503527&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Jul 16
00:28:29 2013
@@ -459,6 +459,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-569. Add support for requesting and enforcing preemption requests via
a capacity monitor. (Carlo Curino, cdouglas)
+ YARN-521. Augment AM - RM client module to be able to request containers
+ only at specific locations (Sandy Ryza via bikas)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1503527&r1=1503526&r2=1503527&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
Tue Jul 16 00:28:29 2013
@@ -69,24 +69,32 @@ public abstract class AMRMClient<T exten
}
/**
- * Object to represent container request for resources. Scheduler
+ * Object to represent a container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters
* are honored.
- * All getters return immutable values.
*
- * @param capability
- * The {@link Resource} to be requested for each container.
- * @param nodes
- * Any hosts to request that the containers are placed on.
- * @param racks
- * Any racks to request that the containers are placed on. The racks
- * corresponding to any hosts requested will be automatically added to
- * this list.
- * @param priority
- * The priority at which to request the containers. Higher priorities have
- * lower numerical values.
- * @param containerCount
- * The number of containers to request.
+ * By default, YARN schedulers try to allocate containers at the requested
+ * locations but they may relax the constraints in order to expedite meeting
+ * allocations limits. They first relax the constraint to the same rack as
the
+ * requested node and then to anywhere in the cluster. The relaxLocality flag
+ * may be used to disable locality relaxation and request containers at only
+ * specific locations. The following conditions apply.
+ * <ul>
+ * <li>Within a priority, all container requests must have the same value for
+ * locality relaxation. Either enabled or disabled.</li>
+ * <li>If locality relaxation is disabled, then across requests, locations at
+ * different network levels may not be specified. E.g. its invalid to make a
+ * request for a specific node and another request for a specific rack.</li>
+ * <li>If locality relaxation is disabled, then only within the same
request,
+ * a node and its rack may be specified together. This allows for a specific
+ * rack with a preference for a specific node within that rack.</li>
+ * <li></li>
+ * </ul>
+ * To re-enable locality relaxation at a given priority, all pending
requests
+ * with locality relaxation disabled must be first removed. Then they can be
+ * added back with locality relaxation enabled.
+ *
+ * All getters return immutable values.
*/
public static class ContainerRequest {
final Resource capability;
@@ -94,9 +102,55 @@ public abstract class AMRMClient<T exten
final List<String> racks;
final Priority priority;
final int containerCount;
-
+ final boolean relaxLocality;
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints and
+ * locality relaxation enabled.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be
automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param containerCount
+ * The number of containers to request.
+ */
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount) {
+ this(capability, nodes, racks, priority, containerCount, true);
+ }
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be
automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param containerCount
+ * The number of containers to request.
+ * @param relaxLocality
+ * If true, containers for this request may be assigned on hosts
+ * and racks other than the ones explicitly requested.
+ */
+ public ContainerRequest(Resource capability, String[] nodes,
+ String[] racks, Priority priority, int containerCount,
+ boolean relaxLocality) {
+ // Validate request
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
"should not be null ");
@@ -104,11 +158,17 @@ public abstract class AMRMClient<T exten
"The priority at which to request containers should not be null ");
Preconditions.checkArgument(containerCount > 0,
"The number of containers to request should larger than 0");
+ Preconditions.checkArgument(
+ (!relaxLocality && (racks == null || racks.length == 0)
+ && (nodes == null || nodes.length == 0)),
+ "Can't turn off locality relaxation on a " +
+ "request with no location constraints");
this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority;
this.containerCount = containerCount;
+ this.relaxLocality = relaxLocality;
}
public Resource getCapability() {
@@ -131,6 +191,10 @@ public abstract class AMRMClient<T exten
return containerCount;
}
+ public boolean getRelaxLocality() {
+ return relaxLocality;
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
@@ -154,6 +218,11 @@ public abstract class AMRMClient<T exten
String[] racks, Priority priority) {
super(capability, nodes, racks, priority, 1);
}
+
+ public StoredContainerRequest(Resource capability, String[] nodes,
+ String[] racks, Priority priority, boolean relaxLocality) {
+ super(capability, nodes, racks, priority, 1, relaxLocality);
+ }
}
/**
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1503527&r1=1503526&r2=1503527&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
Tue Jul 16 00:28:29 2013
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -75,6 +77,8 @@ import com.google.common.base.Preconditi
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+ private static final List<String> ANY_LIST =
+ Collections.singletonList(ResourceRequest.ANY);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -91,9 +95,10 @@ public class AMRMClientImpl<T extends Co
LinkedHashSet<T> containerRequests;
ResourceRequestInfo(Priority priority, String resourceName,
- Resource capability) {
+ Resource capability, boolean relaxLocality) {
remoteRequest = ResourceRequest.newInstance(priority, resourceName,
capability, 0);
+ remoteRequest.setRelaxLocality(relaxLocality);
containerRequests = new LinkedHashSet<T>();
}
}
@@ -226,7 +231,7 @@ public class AMRMClientImpl<T extends Co
@Override
public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException {
- Preconditions.checkArgument(progressIndicator > 0,
+ Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
ArrayList<ResourceRequest> askList = null;
@@ -326,17 +331,30 @@ public class AMRMClientImpl<T extends Co
public synchronized void addContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
- Set<String> allRacks = new HashSet<String>();
+ Set<String> dedupedRacks = new HashSet<String>();
if (req.getRacks() != null) {
- allRacks.addAll(req.getRacks());
- if(req.getRacks().size() != allRacks.size()) {
+ dedupedRacks.addAll(req.getRacks());
+ if(req.getRacks().size() != dedupedRacks.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: "
+ joiner.join(req.getRacks()));
}
}
- allRacks.addAll(resolveRacks(req.getNodes()));
-
+ Set<String> inferredRacks = resolveRacks(req.getNodes());
+ inferredRacks.removeAll(dedupedRacks);
+
+ // check that specific and non-specific requests cannot be mixed within a
+ // priority
+ checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
+ req.getRelaxLocality());
+ // check that specific rack cannot be mixed with specific node within a
+ // priority. If node and its rack are both specified then they must be
+ // in the same request.
+ // For explicitly requested racks, we set locality relaxation to true
+ checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
+ checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
+ req.getRelaxLocality());
+
if (req.getNodes() != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
if(dedupedNodes.size() != req.getNodes().size()) {
@@ -345,21 +363,26 @@ public class AMRMClientImpl<T extends Co
+ joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
- // Ensure node requests are accompanied by requests for
- // corresponding rack
addResourceRequest(req.getPriority(), node, req.getCapability(),
- req.getContainerCount(), req);
+ req.getContainerCount(), req, true);
}
}
- for (String rack : allRacks) {
+ for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(),
- req.getContainerCount(), req);
+ req.getContainerCount(), req, true);
+ }
+
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ for (String rack : inferredRacks) {
+ addResourceRequest(req.getPriority(), rack, req.getCapability(),
+ req.getContainerCount(), req, req.getRelaxLocality());
}
// Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getCapability(),
- req.getContainerCount(), req);
+ req.getContainerCount(), req, req.getRelaxLocality());
}
@Override
@@ -428,7 +451,8 @@ public class AMRMClientImpl<T extends Co
}
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
- if (resourceRequestInfo != null) {
+ if (resourceRequestInfo != null &&
+ !resourceRequestInfo.containerRequests.isEmpty()) {
list.add(resourceRequestInfo.containerRequests);
return list;
}
@@ -438,7 +462,8 @@ public class AMRMClientImpl<T extends Co
SortedMap<Resource, ResourceRequestInfo> tailMap =
reqMap.tailMap(capability);
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
- if(canFit(entry.getKey(), capability)) {
+ if (canFit(entry.getKey(), capability) &&
+ !entry.getValue().containerRequests.isEmpty()) {
// match found that fits in the larger resource
list.add(entry.getValue().containerRequests);
}
@@ -466,6 +491,33 @@ public class AMRMClientImpl<T extends Co
return racks;
}
+ /**
+ * ContainerRequests with locality relaxation cannot be made at the same
+ * priority as ContainerRequests without locality relaxation.
+ */
+ private void checkLocalityRelaxationConflict(Priority priority,
+ Collection<String> locations, boolean relaxLocality) {
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ return;
+ }
+ // Locality relaxation will be set to relaxLocality for all implicitly
+ // requested racks. Make sure that existing rack requests match this.
+ for (String location : locations) {
+ TreeMap<Resource, ResourceRequestInfo> reqs =
+ remoteRequests.get(location);
+ if (reqs != null && !reqs.isEmpty()
+ && reqs.values().iterator().next().remoteRequest.getRelaxLocality()
+ != relaxLocality) {
+ throw new InvalidContainerRequestException("Cannot submit a "
+ + "ContainerRequest asking for location " + location
+ + " with locality relaxation " + relaxLocality + " when it has "
+ + "already been requested with locality relaxation " +
relaxLocality);
+ }
+ }
+ }
+
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
@@ -484,7 +536,7 @@ public class AMRMClientImpl<T extends Co
}
private void addResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount, T req) {
+ Resource capability, int containerCount, T req, boolean relaxLocality) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
@@ -506,14 +558,15 @@ public class AMRMClientImpl<T extends Co
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
- new ResourceRequestInfo(priority, resourceName, capability);
+ new ResourceRequestInfo(priority, resourceName, capability,
+ relaxLocality);
reqMap.put(capability, resourceRequestInfo);
}
resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() +
containerCount);
- if(req instanceof StoredContainerRequest) {
+ if (req instanceof StoredContainerRequest && relaxLocality) {
resourceRequestInfo.containerRequests.add(req);
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1503527&r1=1503526&r2=1503527&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
Tue Jul 16 00:28:29 2013
@@ -83,6 +83,7 @@ public class TestAMRMClient {
static Resource capability;
static Priority priority;
+ static Priority priority2;
static String node;
static String rack;
static String[] nodes;
@@ -105,6 +106,7 @@ public class TestAMRMClient {
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
priority = Priority.newInstance(1);
+ priority2 = Priority.newInstance(2);
capability = Resource.newInstance(1024, 1);
node = nodeReports.get(0).getNodeId().getHost();
@@ -181,6 +183,7 @@ public class TestAMRMClient {
Resource capability4 = Resource.newInstance(2000, 1);
Resource capability5 = Resource.newInstance(1000, 3);
Resource capability6 = Resource.newInstance(2000, 1);
+ Resource capability7 = Resource.newInstance(2000, 1);
StoredContainerRequest storedContainer1 =
new StoredContainerRequest(capability1, nodes, racks, priority);
@@ -194,12 +197,15 @@ public class TestAMRMClient {
new StoredContainerRequest(capability5, nodes, racks, priority);
StoredContainerRequest storedContainer6 =
new StoredContainerRequest(capability6, nodes, racks, priority);
+ StoredContainerRequest storedContainer7 =
+ new StoredContainerRequest(capability7, nodes, racks, priority2,
false);
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3);
amClient.addContainerRequest(storedContainer4);
amClient.addContainerRequest(storedContainer5);
amClient.addContainerRequest(storedContainer6);
+ amClient.addContainerRequest(storedContainer7);
// test matching of containers
List<? extends Collection<StoredContainerRequest>> matches;
@@ -249,6 +255,15 @@ public class TestAMRMClient {
matches = amClient.getMatchingRequests(priority, node, testCapability5);
assert(matches.size() == 0);
+ // verify requests without relaxed locality are only returned at specific
+ // locations
+ Resource testCapability7 = Resource.newInstance(2000, 1);
+ matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY,
+ testCapability7);
+ assert(matches.size() == 0);
+ matches = amClient.getMatchingRequests(priority2, node, testCapability7);
+ assert(matches.size() == 1);
+
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java?rev=1503527&r1=1503526&r2=1503527&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
Tue Jul 16 00:28:29 2013
@@ -24,12 +24,15 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.junit.Test;
@@ -52,11 +55,159 @@ public class TestAMRMClientContainerRequ
new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1), 4);
client.addContainerRequest(request);
- verifyResourceRequestLocation(client, request, "host1");
- verifyResourceRequestLocation(client, request, "host2");
- verifyResourceRequestLocation(client, request, "/rack1");
- verifyResourceRequestLocation(client, request, "/rack2");
- verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+ verifyResourceRequest(client, request, "host1", true);
+ verifyResourceRequest(client, request, "host2", true);
+ verifyResourceRequest(client, request, "/rack1", true);
+ verifyResourceRequest(client, request, "/rack2", true);
+ verifyResourceRequest(client, request, ResourceRequest.ANY, true);
+ }
+
+ @Test
+ public void testDisableLocalityRelaxation() {
+ AMRMClientImpl<ContainerRequest> client = new
AMRMClientImpl<ContainerRequest>(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+ Configuration conf = new Configuration();
+ conf.setClass(
+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest nodeLevelRequest =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ null, Priority.newInstance(1), 4, false);
+ client.addContainerRequest(nodeLevelRequest);
+
+ verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY,
false);
+ verifyResourceRequest(client, nodeLevelRequest, "/rack1", false);
+ verifyResourceRequest(client, nodeLevelRequest, "host1", true);
+ verifyResourceRequest(client, nodeLevelRequest, "host2", true);
+
+ // Make sure we don't get any errors with two node-level requests at the
+ // same priority
+ ContainerRequest nodeLevelRequest2 =
+ new ContainerRequest(capability, new String[] {"host2", "host3"},
+ null, Priority.newInstance(1), 4, false);
+ client.addContainerRequest(nodeLevelRequest2);
+
+ AMRMClient.ContainerRequest rackLevelRequest =
+ new AMRMClient.ContainerRequest(capability, null,
+ new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), 3,
false);
+ client.addContainerRequest(rackLevelRequest);
+
+ verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY,
false);
+ verifyResourceRequest(client, rackLevelRequest, "/rack3", true);
+ verifyResourceRequest(client, rackLevelRequest, "/rack4", true);
+
+ // Make sure we don't get any errors with two rack-level requests at the
+ // same priority
+ AMRMClient.ContainerRequest rackLevelRequest2 =
+ new AMRMClient.ContainerRequest(capability, null,
+ new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), 3,
false);
+ client.addContainerRequest(rackLevelRequest2);
+
+ ContainerRequest bothLevelRequest =
+ new ContainerRequest(capability, new String[] {"host3", "host4"},
+ new String[] {"rack1", "/otherrack"},
+ Priority.newInstance(3), 4, false);
+ client.addContainerRequest(bothLevelRequest);
+
+ verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY,
false);
+ verifyResourceRequest(client, bothLevelRequest, "rack1",
+ true);
+ verifyResourceRequest(client, bothLevelRequest, "/otherrack",
+ true);
+ verifyResourceRequest(client, bothLevelRequest, "host3", true);
+ verifyResourceRequest(client, bothLevelRequest, "host4", true);
+
+ // Make sure we don't get any errors with two both-level requests at the
+ // same priority
+ ContainerRequest bothLevelRequest2 =
+ new ContainerRequest(capability, new String[] {"host4", "host5"},
+ new String[] {"rack1", "/otherrack2"},
+ Priority.newInstance(3), 4, false);
+ client.addContainerRequest(bothLevelRequest2);
+ }
+
+ @Test (expected = InvalidContainerRequestException.class)
+ public void testDifferentLocalityRelaxationSamePriority() {
+ AMRMClientImpl<ContainerRequest> client = new
AMRMClientImpl<ContainerRequest>(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+ Configuration conf = new Configuration();
+ conf.setClass(
+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request1 =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ null, Priority.newInstance(1), 4, false);
+ client.addContainerRequest(request1);
+ ContainerRequest request2 =
+ new ContainerRequest(capability, new String[] {"host3"},
+ null, Priority.newInstance(1), 4, true);
+ client.addContainerRequest(request2);
+ }
+
+ @Test
+ public void testInvalidValidWhenOldRemoved() {
+ AMRMClientImpl<ContainerRequest> client = new
AMRMClientImpl<ContainerRequest>(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+ Configuration conf = new Configuration();
+ conf.setClass(
+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request1 =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ null, Priority.newInstance(1), 4, false);
+ client.addContainerRequest(request1);
+
+ client.removeContainerRequest(request1);
+
+ ContainerRequest request2 =
+ new ContainerRequest(capability, new String[] {"host3"},
+ null, Priority.newInstance(1), 4, true);
+ client.addContainerRequest(request2);
+
+ client.removeContainerRequest(request2);
+
+ ContainerRequest request3 =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ null, Priority.newInstance(1), 4, false);
+ client.addContainerRequest(request3);
+
+ client.removeContainerRequest(request3);
+
+ ContainerRequest request4 =
+ new ContainerRequest(capability, null,
+ new String[] {"rack1"}, Priority.newInstance(1), 4, true);
+ client.addContainerRequest(request4);
+
+ }
+
+ @Test (expected = InvalidContainerRequestException.class)
+ public void testLocalityRelaxationDifferentLevels() {
+ AMRMClientImpl<ContainerRequest> client = new
AMRMClientImpl<ContainerRequest>(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+ Configuration conf = new Configuration();
+ conf.setClass(
+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request1 =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ null, Priority.newInstance(1), 4, false);
+ client.addContainerRequest(request1);
+ ContainerRequest request2 =
+ new ContainerRequest(capability, null,
+ new String[] {"rack1"}, Priority.newInstance(1), 4, true);
+ client.addContainerRequest(request2);
}
private static class MyResolver implements DNSToSwitchMapping {
@@ -70,12 +221,13 @@ public class TestAMRMClientContainerRequ
public void reloadCachedMappings() {}
}
- private void verifyResourceRequestLocation(
+ private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
- String location) {
+ String location, boolean expectedRelaxLocality) {
ResourceRequest ask =
client.remoteRequestsTable.get(request.getPriority())
.get(location).get(request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(request.getContainerCount(), ask.getNumContainers());
+ assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
}
}