Author: tucu
Date: Sun Jun 16 22:11:38 2013
New Revision: 1493599
URL: http://svn.apache.org/r1493599
Log:
YARN-752. In AMRMClient, automatically add corresponding rack requests for
requested nodes. (sandyr via tucu)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sun Jun 16 22:11:38 2013
@@ -372,6 +372,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can
then
use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
+ YARN-752. In AMRMClient, automatically add corresponding rack requests for
+ requested nodes. (sandyr via tucu)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
Sun Jun 16 22:11:38 2013
@@ -42,23 +42,36 @@ import com.google.common.collect.Immutab
public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends
Service {
/**
- * Object to represent container request for resources.
- * Resources may be localized to nodes and racks.
- * Resources may be assigned priorities.
- * All getters return unmodifiable collections.
- * Can ask for multiple containers of a given type.
+ * Object to represent container request for resources. Scheduler
+ * documentation should be consulted for the specifics of how the parameters
+ * are honored.
+ * All getters return immutable values.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The racks
+ * corresponding to any hosts requested will be automatically added to
+ * this list.
+ * @param priority
+ * The priority at which to request the containers. Higher priorities have
+ * lower numerical values.
+ * @param containerCount
+ * The number of containers to request.
*/
public static class ContainerRequest {
final Resource capability;
- final ImmutableList<String> hosts;
- final ImmutableList<String> racks;
+ final List<String> nodes;
+ final List<String> racks;
final Priority priority;
final int containerCount;
- public ContainerRequest(Resource capability, String[] hosts,
+ public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount) {
this.capability = capability;
- this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+ this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority;
this.containerCount = containerCount;
@@ -68,8 +81,8 @@ public interface AMRMClient<T extends AM
return capability;
}
- public List<String> getHosts() {
- return hosts;
+ public List<String> getNodes() {
+ return nodes;
}
public List<String> getRacks() {
@@ -103,9 +116,9 @@ public interface AMRMClient<T extends AM
* AMRMClient can remove it from its internal store.
*/
public static class StoredContainerRequest extends ContainerRequest {
- public StoredContainerRequest(Resource capability, String[] hosts,
+ public StoredContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority) {
- super(capability, hosts, racks, priority, 1);
+ super(capability, nodes, racks, priority, 1);
}
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
Sun Jun 16 22:11:38 2013
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.base.Joiner;
import com.google.common.annotations.VisibleForTesting;
@@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends Co
//Key -> Priority
//Value -> Map
- //Key->ResourceName (e.g., hostname, rackname, *)
+ //Key->ResourceName (e.g., nodename, rackname, *)
//Value->Map
//Key->Resource Capability
//Value->ResourceRequest
@@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends Co
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ RackResolver.init(conf);
super.serviceInit(conf);
}
@@ -309,22 +314,37 @@ public class AMRMClientImpl<T extends Co
@Override
public synchronized void addContainerRequest(T req) {
- // Create resource requests
- // add check for dup locations
- if (req.hosts != null) {
- for (String host : req.hosts) {
- addResourceRequest(req.priority, host, req.capability,
- req.containerCount, req);
+ Set<String> allRacks = new HashSet<String>();
+ if (req.racks != null) {
+ allRacks.addAll(req.racks);
+ if(req.racks.size() != allRacks.size()) {
+ Joiner joiner = Joiner.on(',');
+ LOG.warn("ContainerRequest has duplicate racks: "
+ + joiner.join(req.racks));
}
}
-
- if (req.racks != null) {
- for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability,
+ allRacks.addAll(resolveRacks(req.nodes));
+
+ if (req.nodes != null) {
+ HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
+ if(dedupedNodes.size() != req.nodes.size()) {
+ Joiner joiner = Joiner.on(',');
+ LOG.warn("ContainerRequest has duplicate nodes: "
+ + joiner.join(req.nodes));
+ }
+ for (String node : dedupedNodes) {
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ addResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
}
}
+ for (String rack : allRacks) {
+ addResourceRequest(req.priority, rack, req.capability,
+ req.containerCount, req);
+ }
+
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount, req);
@@ -332,19 +352,23 @@ public class AMRMClientImpl<T extends Co
@Override
public synchronized void removeContainerRequest(T req) {
+ Set<String> allRacks = new HashSet<String>();
+ if (req.racks != null) {
+ allRacks.addAll(req.racks);
+ }
+ allRacks.addAll(resolveRacks(req.nodes));
+
// Update resource requests
- if (req.hosts != null) {
- for (String hostName : req.hosts) {
- decResourceRequest(req.priority, hostName, req.capability,
+ if (req.nodes != null) {
+ for (String node : new HashSet<String>(req.nodes)) {
+ decResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
}
}
- if (req.racks != null) {
- for (String rack : req.racks) {
- decResourceRequest(req.priority, rack, req.capability,
- req.containerCount, req);
- }
+ for (String rack : allRacks) {
+ decResourceRequest(req.priority, rack, req.capability,
+ req.containerCount, req);
}
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
@@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends Co
return list;
}
+ private Set<String> resolveRacks(List<String> nodes) {
+ Set<String> racks = new HashSet<String>();
+ if (nodes != null) {
+ for (String node : nodes) {
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ String rack = RackResolver.resolve(node).getNetworkLocation();
+ if (rack == null) {
+ LOG.warn("Failed to resolve rack for node " + node + ".");
+ } else {
+ racks.add(rack);
+ }
+ }
+ }
+
+ return racks;
+ }
+
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493599&r1=1493598&r2=1493599&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
Sun Jun 16 22:11:38 2013
@@ -267,6 +267,51 @@ public class TestAMRMClient {
assertTrue(matches.size() == 1);
assertTrue(matches.get(0).size() == matchSize);
}
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchingFitInferredRack() throws YarnException,
IOException {
+ AMRMClientImpl<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Resource capability = Resource.newInstance(1024, 2);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability, nodes, null, priority);
+ amClient.addContainerRequest(storedContainer1);
+
+ // verify matching with original node and inferred rack
+ List<? extends Collection<StoredContainerRequest>> matches;
+ StoredContainerRequest storedRequest;
+ // exact match node
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ // inferred match rack
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+
+ // inferred rack match no longer valid after request is removed
+ amClient.removeContainerRequest(storedContainer1);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ assertTrue(matches.isEmpty());
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
@Test (timeout=60000)
public void testAMRMClientMatchStorage() throws YarnException, IOException {
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java?rev=1493599&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
Sun Jun 16 22:11:38 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
+import static org.junit.Assert.assertEquals;
+
+public class TestAMRMClientContainerRequest {
+ @Test
+ public void testFillInRacks() {
+ AMRMClientImpl<ContainerRequest> client = new
AMRMClientImpl<ContainerRequest>(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+
+ Configuration conf = new Configuration();
+ conf.setClass(
+
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ new String[] {"/rack2"}, Priority.newInstance(1), 4);
+ client.addContainerRequest(request);
+ verifyResourceRequestLocation(client, request, "host1");
+ verifyResourceRequestLocation(client, request, "host2");
+ verifyResourceRequestLocation(client, request, "/rack1");
+ verifyResourceRequestLocation(client, request, "/rack2");
+ verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+ }
+
+ private static class MyResolver implements DNSToSwitchMapping {
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ return Arrays.asList("/rack1");
+ }
+
+ @Override
+ public void reloadCachedMappings() {}
+ }
+
+ private void verifyResourceRequestLocation(
+ AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+ String location) {
+ ResourceRequest ask = client.remoteRequestsTable.get(request.priority)
+ .get(location).get(request.capability).remoteRequest;
+ assertEquals(location, ask.getResourceName());
+ assertEquals(request.getContainerCount(), ask.getNumContainers());
+ }
+}