Author: sandy
Date: Thu Nov 14 22:13:07 2013
New Revision: 1542106
URL: http://svn.apache.org/r1542106
Log:
YARN-1392: Add new files
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1542106&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
Thu Nov 14 22:13:07 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class QueuePlacementPolicy {
+ private static final Map<String, Class<? extends QueuePlacementRule>>
ruleClasses;
+ static {
+ Map<String, Class<? extends QueuePlacementRule>> map =
+ new HashMap<String, Class<? extends QueuePlacementRule>>();
+ map.put("user", QueuePlacementRule.User.class);
+ map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+ map.put("specified", QueuePlacementRule.Specified.class);
+ map.put("default", QueuePlacementRule.Default.class);
+ map.put("reject", QueuePlacementRule.Reject.class);
+ ruleClasses = Collections.unmodifiableMap(map);
+ }
+
+ private final List<QueuePlacementRule> rules;
+ private final Set<String> configuredQueues;
+ private final Groups groups;
+
+ public QueuePlacementPolicy(List<QueuePlacementRule> rules,
+ Set<String> configuredQueues, Configuration conf)
+ throws AllocationConfigurationException {
+ for (int i = 0; i < rules.size()-1; i++) {
+ if (rules.get(i).isTerminal()) {
+ throw new AllocationConfigurationException("Rules after rule "
+ + i + " in queue placement policy can never be reached");
+ }
+ }
+ if (!rules.get(rules.size()-1).isTerminal()) {
+ throw new AllocationConfigurationException(
+ "Could get past last queue placement rule without assigning");
+ }
+ this.rules = rules;
+ this.configuredQueues = configuredQueues;
+ groups = new Groups(conf);
+ }
+
+ /**
+ * Builds a QueuePlacementPolicy from an xml element.
+ */
+ public static QueuePlacementPolicy fromXml(Element el, Set<String>
configuredQueues,
+ Configuration conf) throws AllocationConfigurationException {
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ NodeList elements = el.getChildNodes();
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ Element element = (Element)node;
+ String ruleName = element.getTagName();
+ Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+ if (clazz == null) {
+ throw new AllocationConfigurationException("No rule class found for "
+ + ruleName);
+ }
+ QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+ rule.initializeFromXml(element);
+ rules.add(rule);
+ }
+ }
+ return new QueuePlacementPolicy(rules, configuredQueues, conf);
+ }
+
+ /**
+ * Applies this rule to an app with the given requested queue and user/group
+ * information.
+ *
+ * @param requestedQueue
+ * The queue specified in the ApplicationSubmissionContext
+ * @param user
+ * The user submitting the app
+ * @return
+ * The name of the queue to assign the app to. Or null if the app should
+ * be rejected.
+ * @throws IOException
+ * If an exception is encountered while getting the user's groups
+ */
+ public String assignAppToQueue(String requestedQueue, String user)
+ throws IOException {
+ for (QueuePlacementRule rule : rules) {
+ String queue = rule.assignAppToQueue(requestedQueue, user, groups,
+ configuredQueues);
+ if (queue == null || !queue.isEmpty()) {
+ return queue;
+ }
+ }
+ throw new IllegalStateException("Should have applied a rule before " +
+ "reaching here");
+ }
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java?rev=1542106&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
Thu Nov 14 22:13:07 2013
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+
+public abstract class QueuePlacementRule {
+ protected boolean create;
+
+ /**
+ * Initializes the rule with any arguments.
+ *
+ * @param args
+ * Additional attributes of the rule's xml element other than create.
+ */
+ public QueuePlacementRule initialize(boolean create, Map<String, String>
args) {
+ this.create = create;
+ return this;
+ }
+
+ /**
+ *
+ * @param requestedQueue
+ * The queue explicitly requested.
+ * @param user
+ * The user submitting the app.
+ * @param groups
+ * The groups of the user submitting the app.
+ * @param configuredQueues
+ * The queues specified in the scheduler configuration.
+ * @return
+ * The queue to place the app into. An empty string indicates that we
should
+ * continue to the next rule, and null indicates that the app should be
rejected.
+ */
+ public String assignAppToQueue(String requestedQueue, String user,
+ Groups groups, Collection<String> configuredQueues) throws IOException {
+ String queue = getQueueForApp(requestedQueue, user, groups);
+ if (create || configuredQueues.contains(queue)) {
+ return queue;
+ } else {
+ return "";
+ }
+ }
+
+ public void initializeFromXml(Element el) {
+ boolean create = true;
+ NamedNodeMap attributes = el.getAttributes();
+ Map<String, String> args = new HashMap<String, String>();
+ for (int i = 0; i < attributes.getLength(); i++) {
+ Node node = attributes.item(i);
+ String key = node.getNodeName();
+ String value = node.getNodeValue();
+ if (key.equals("create")) {
+ create = Boolean.parseBoolean(value);
+ } else {
+ args.put(key, value);
+ }
+ }
+ initialize(create, args);
+ }
+
+ /**
+ * Returns true if this rule never tells the policy to continue.
+ */
+ public abstract boolean isTerminal();
+
+ /**
+ * Applies this rule to an app with the given requested queue and user/group
+ * information.
+ *
+ * @param requestedQueue
+ * The queue specified in the ApplicationSubmissionContext
+ * @param user
+ * The user submitting the app.
+ * @param groups
+ * The groups of the user submitting the app.
+ * @return
+ * The name of the queue to assign the app to, or null to empty string
+ * continue to the next rule.
+ */
+ protected abstract String getQueueForApp(String requestedQueue, String user,
+ Groups groups) throws IOException;
+
+ /**
+ * Places apps in queues by username of the submitter
+ */
+ public static class User extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue,
+ String user, Groups groups) {
+ return "root." + user;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return create;
+ }
+ }
+
+ /**
+ * Places apps in queues by primary group of the submitter
+ */
+ public static class PrimaryGroup extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue,
+ String user, Groups groups) throws IOException {
+ return "root." + groups.getGroups(user).get(0);
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return create;
+ }
+ }
+
+ /**
+ * Places apps in queues by requested queue of the submitter
+ */
+ public static class Specified extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue,
+ String user, Groups groups) {
+ if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
+ return "";
+ } else {
+ if (!requestedQueue.startsWith("root.")) {
+ requestedQueue = "root." + requestedQueue;
+ }
+ return requestedQueue;
+ }
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * Places all apps in the default queue
+ */
+ public static class Default extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups) {
+ return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return create;
+ }
+ }
+
+ /**
+ * Rejects all apps
+ */
+ public static class Reject extends QueuePlacementRule {
+ @Override
+ public String assignAppToQueue(String requestedQueue, String user,
+ Groups groups, Collection<String> configuredQueues) {
+ return null;
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return true;
+ }
+ }
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java?rev=1542106&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
Thu Nov 14 22:13:07 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class SimpleGroupsMapping implements GroupMappingServiceProvider {
+
+ @Override
+ public List<String> getGroups(String user) {
+ return Arrays.asList(user + "group");
+ }
+
+ @Override
+ public void cacheGroupsRefresh() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cacheGroupsAdd(List<String> groups) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java?rev=1542106&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
Thu Nov 14 22:13:07 2013
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.collect.Sets;
+
+public class TestQueuePlacementPolicy {
+ private final static Configuration conf = new Configuration();
+ private final static Set<String> configuredQueues =
Sets.newHashSet("root.someuser");
+
+ @BeforeClass
+ public static void setup() {
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ }
+
+ @Test
+ public void testSpecifiedUserPolicy() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <specified />");
+ sb.append(" <user />");
+ sb.append("</queuePlacementPolicy>");
+ QueuePlacementPolicy policy = parse(sb.toString());
+ assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq",
"someuser"));
+ assertEquals("root.someuser", policy.assignAppToQueue("default",
"someuser"));
+ assertEquals("root.otheruser", policy.assignAppToQueue("default",
"otheruser"));
+ }
+
+ @Test
+ public void testNoCreate() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <specified />");
+ sb.append(" <user create=\"false\" />");
+ sb.append(" <default />");
+ sb.append("</queuePlacementPolicy>");
+ QueuePlacementPolicy policy = parse(sb.toString());
+ assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq",
"someuser"));
+ assertEquals("root.someuser", policy.assignAppToQueue("default",
"someuser"));
+ assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq",
"otheruser"));
+ assertEquals("root.default", policy.assignAppToQueue("default",
"otheruser"));
+ }
+
+ @Test
+ public void testSpecifiedThenReject() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <specified />");
+ sb.append(" <reject />");
+ sb.append("</queuePlacementPolicy>");
+ QueuePlacementPolicy policy = parse(sb.toString());
+ assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq",
"someuser"));
+ assertEquals(null, policy.assignAppToQueue("default", "someuser"));
+ }
+
+ @Test (expected = AllocationConfigurationException.class)
+ public void testOmittedTerminalRule() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <specified />");
+ sb.append(" <user create=\"false\" />");
+ sb.append("</queuePlacementPolicy>");
+ parse(sb.toString());
+ }
+
+ @Test (expected = AllocationConfigurationException.class)
+ public void testTerminalRuleInMiddle() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <specified />");
+ sb.append(" <default />");
+ sb.append(" <user />");
+ sb.append("</queuePlacementPolicy>");
+ parse(sb.toString());
+ }
+
+ private QueuePlacementPolicy parse(String str) throws Exception {
+ // Read and parse the allocations file.
+ DocumentBuilderFactory docBuilderFactory =
+ DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setIgnoringComments(true);
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc = builder.parse(IOUtils.toInputStream(str));
+ Element root = doc.getDocumentElement();
+ return QueuePlacementPolicy.fromXml(root, configuredQueues, conf);
+ }
+}