YARN-1645. ContainerManager implementation to support container resizing. 
Contributed by Meng Ding & Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2934ff34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2934ff34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2934ff34

Branch: refs/heads/YARN-1197
Commit: 2934ff34531dd0d9c84985aa5e773d03685df139
Parents: 2d8741a
Author: Jian He <jia...@apache.org>
Authored: Tue Jul 21 16:10:40 2015 -0700
Committer: Jian He <jia...@apache.org>
Committed: Tue Jul 21 16:11:49 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../CMgrDecreaseContainersResourceEvent.java    |  37 ++++
 .../nodemanager/ContainerManagerEventType.java  |   1 +
 .../containermanager/ContainerManagerImpl.java  | 180 ++++++++++++++++--
 .../container/ChangeContainerResourceEvent.java |  36 ++++
 .../container/ContainerEventType.java           |   4 +
 .../nodemanager/DummyContainerManager.java      |   6 +-
 .../TestContainerManagerWithLCE.java            |  22 +++
 .../BaseContainerManagerTest.java               |  43 ++++-
 .../containermanager/TestContainerManager.java  | 190 ++++++++++++++++++-
 10 files changed, 486 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e0a6cc8..2b735cd 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -150,6 +150,9 @@ Release 2.8.0 - UNRELEASED
     YARN-1449. AM-NM protocol changes to support container resizing.
     (Meng Ding & Wangda Tan via jianhe)
 
+    YARN-1645. ContainerManager implementation to support container resizing.
+    (Meng Ding & Wangda Tan via jianhe)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
new file mode 100644
index 0000000..9479d0b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import java.util.List;
+
+public class CMgrDecreaseContainersResourceEvent extends ContainerManagerEvent 
{
+
+  private final List<Container> containersToDecrease;
+
+  public CMgrDecreaseContainersResourceEvent(List<Container>
+      containersToDecrease) {
+    super(ContainerManagerEventType.DECREASE_CONTAINERS_RESOURCE);
+    this.containersToDecrease = containersToDecrease;
+  }
+
+  public List<Container> getContainersToDecrease() {
+    return this.containersToDecrease;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
index 4278ce0..fcb0252 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
@@ -21,4 +21,5 @@ package org.apache.hadoop.yarn.server.nodemanager;
 public enum ContainerManagerEventType {
   FINISH_APPS,
   FINISH_CONTAINERS,
+  DECREASE_CONTAINERS_RESOURCE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 1082ea1..1e06919 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
@@ -96,6 +97,7 @@ import 
org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -113,6 +115,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
@@ -141,6 +144,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerManagerImpl extends CompositeService implements
     ServiceStateChangeListener, ContainerManagementProtocol,
@@ -663,33 +667,45 @@ public class ContainerManagerImpl extends 
CompositeService implements
 
   /**
    * @param containerTokenIdentifier
-   *          of the container to be started
+   *          of the container whose resource is to be started or increased
    * @throws YarnException
    */
   @Private
   @VisibleForTesting
-  protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
-      ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
+  protected void authorizeStartAndResourceIncreaseRequest(
+      NMTokenIdentifier nmTokenIdentifier,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      boolean startRequest)
+      throws YarnException {
     if (nmTokenIdentifier == null) {
       throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
     }
     if (containerTokenIdentifier == null) {
       throw RPCUtil.getRemoteException(INVALID_CONTAINERTOKEN_MSG);
     }
+    /*
+     * Check the following:
+     * 1. The request comes from the same application attempt
+     * 2. The request possess a container token that has not expired
+     * 3. The request possess a container token that is granted by a known RM
+     */
     ContainerId containerId = containerTokenIdentifier.getContainerID();
     String containerIDStr = containerId.toString();
     boolean unauthorized = false;
     StringBuilder messageBuilder =
-        new StringBuilder("Unauthorized request to start container. ");
+        new StringBuilder("Unauthorized request to " + (startRequest ?
+            "start container." : "increase container resource."));
     if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
         equals(containerId.getApplicationAttemptId().getApplicationId())) {
       unauthorized = true;
       messageBuilder.append("\nNMToken for application attempt : ")
         .append(nmTokenIdentifier.getApplicationAttemptId())
-        .append(" was used for starting container with container token")
+        .append(" was used for "
+            + (startRequest ? "starting " : "increasing resource of ")
+            + "container with container token")
         .append(" issued for application attempt : ")
         .append(containerId.getApplicationAttemptId());
-    } else if (!this.context.getContainerTokenSecretManager()
+    } else if (startRequest && !this.context.getContainerTokenSecretManager()
         .isValidStartContainerRequest(containerTokenIdentifier)) {
       // Is the container being relaunched? Or RPC layer let startCall with
       // tokens generated off old-secret through?
@@ -711,6 +727,14 @@ public class ContainerManagerImpl extends CompositeService 
implements
       LOG.error(msg);
       throw RPCUtil.getRemoteException(msg);
     }
+    if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
+        .getRMIdentifier()) {
+      // Is the container coming from unknown RM
+      StringBuilder sb = new StringBuilder("\nContainer ");
+      sb.append(containerTokenIdentifier.getContainerID().toString())
+        .append(" rejected as it is allocated by a previous RM");
+      throw new InvalidContainerException(sb.toString());
+    }
   }
 
   /**
@@ -727,7 +751,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
     }
     UserGroupInformation remoteUgi = getRemoteUgi();
     NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
-    authorizeUser(remoteUgi,nmTokenIdentifier);
+    authorizeUser(remoteUgi, nmTokenIdentifier);
     List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
     Map<ContainerId, SerializedException> failedContainers =
         new HashMap<ContainerId, SerializedException>();
@@ -817,16 +841,8 @@ public class ContainerManagerImpl extends CompositeService 
implements
      * belongs to correct Node Manager (part of retrieve password). c) It has
      * correct RMIdentifier. d) It is not expired.
      */
-    authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
- 
-    if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
-        .getRMIdentifier()) {
-        // Is the container coming from unknown RM
-        StringBuilder sb = new StringBuilder("\nContainer ");
-        sb.append(containerTokenIdentifier.getContainerID().toString())
-          .append(" rejected as it is allocated by a previous RM");
-        throw new InvalidContainerException(sb.toString());
-    }
+    authorizeStartAndResourceIncreaseRequest(
+        nmTokenIdentifier, containerTokenIdentifier, true);
     // update NMToken
     updateNMTokenIdentifier(nmTokenIdentifier);
 
@@ -932,9 +948,118 @@ public class ContainerManagerImpl extends 
CompositeService implements
   @Override
   public IncreaseContainersResourceResponse increaseContainersResource(
       IncreaseContainersResourceRequest requests)
-      throws YarnException, IOException {
-    // To be implemented in YARN-1645
-    return null;
+          throws YarnException, IOException {
+    if (blockNewContainerRequests.get()) {
+      throw new NMNotYetReadyException(
+          "Rejecting container resource increase as NodeManager has not"
+              + " yet connected with ResourceManager");
+    }
+    UserGroupInformation remoteUgi = getRemoteUgi();
+    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+    authorizeUser(remoteUgi, nmTokenIdentifier);
+    List<ContainerId> successfullyIncreasedContainers
+        = new ArrayList<ContainerId>();
+    Map<ContainerId, SerializedException> failedContainers =
+        new HashMap<ContainerId, SerializedException>();
+    // Process container resource increase requests
+    for (org.apache.hadoop.yarn.api.records.Token token :
+        requests.getContainersToIncrease()) {
+      ContainerId containerId = null;
+      try {
+        if (token.getIdentifier() == null) {
+          throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+        }
+        ContainerTokenIdentifier containerTokenIdentifier =
+            BuilderUtils.newContainerTokenIdentifier(token);
+        verifyAndGetContainerTokenIdentifier(token,
+            containerTokenIdentifier);
+        authorizeStartAndResourceIncreaseRequest(
+            nmTokenIdentifier, containerTokenIdentifier, false);
+        containerId = containerTokenIdentifier.getContainerID();
+        // Reuse the startContainer logic to update NMToken,
+        // as container resource increase request will have come with
+        // an updated NMToken.
+        updateNMTokenIdentifier(nmTokenIdentifier);
+        Resource resource = containerTokenIdentifier.getResource();
+        changeContainerResourceInternal(containerId, resource, true);
+        successfullyIncreasedContainers.add(containerId);
+      } catch (YarnException | InvalidToken e) {
+        failedContainers.put(containerId, SerializedException.newInstance(e));
+      } catch (IOException e) {
+        throw RPCUtil.getRemoteException(e);
+      }
+    }
+    return IncreaseContainersResourceResponse.newInstance(
+        successfullyIncreasedContainers, failedContainers);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void changeContainerResourceInternal(
+      ContainerId containerId, Resource targetResource, boolean increase)
+          throws YarnException, IOException {
+    Container container = context.getContainers().get(containerId);
+    // Check container existence
+    if (container == null) {
+      if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {
+        throw RPCUtil.getRemoteException("Container " + containerId.toString()
+            + " was recently stopped on node manager.");
+      } else {
+        throw RPCUtil.getRemoteException("Container " + containerId.toString()
+            + " is not handled by this NodeManager");
+      }
+    }
+    // Check container state
+    org.apache.hadoop.yarn.server.nodemanager.
+        containermanager.container.ContainerState currentState =
+        container.getContainerState();
+    if (currentState != org.apache.hadoop.yarn.server.
+        nodemanager.containermanager.container.ContainerState.RUNNING) {
+      throw RPCUtil.getRemoteException("Container " + containerId.toString()
+          + " is in " + currentState.name() + " state."
+          + " Resource can only be changed when a container is in"
+          + " RUNNING state");
+    }
+    // Check validity of the target resource.
+    Resource currentResource = container.getResource();
+    if (currentResource.equals(targetResource)) {
+      LOG.warn("Unable to change resource for container "
+          + containerId.toString()
+          + ". The target resource "
+          + targetResource.toString()
+          + " is the same as the current resource");
+      return;
+    }
+    if (increase && !Resources.fitsIn(currentResource, targetResource)) {
+      throw RPCUtil.getRemoteException("Unable to increase resource for "
+          + "container " + containerId.toString()
+          + ". The target resource "
+          + targetResource.toString()
+          + " is smaller than the current resource "
+          + currentResource.toString());
+    }
+    if (!increase &&
+        (!Resources.fitsIn(Resources.none(), targetResource)
+            || !Resources.fitsIn(targetResource, currentResource))) {
+      throw RPCUtil.getRemoteException("Unable to decrease resource for "
+          + "container " + containerId.toString()
+          + ". The target resource "
+          + targetResource.toString()
+          + " is not smaller than the current resource "
+          + currentResource.toString());
+    }
+    this.readLock.lock();
+    try {
+      if (!serviceStopped) {
+        dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent(
+            containerId, targetResource));
+      } else {
+        throw new YarnException(
+            "Unable to change container resource as the NodeManager is "
+                + "in the process of shutting down");
+      }
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Private
@@ -1175,6 +1300,21 @@ public class ContainerManagerImpl extends 
CompositeService implements
                   "Container Killed by ResourceManager"));
       }
       break;
+    case DECREASE_CONTAINERS_RESOURCE:
+      CMgrDecreaseContainersResourceEvent containersDecreasedEvent =
+          (CMgrDecreaseContainersResourceEvent) event;
+      for (org.apache.hadoop.yarn.api.records.Container container
+          : containersDecreasedEvent.getContainersToDecrease()) {
+        try {
+          changeContainerResourceInternal(container.getId(),
+              container.getResource(), false);
+        } catch (YarnException e) {
+          LOG.error("Unable to decrease container resource", e);
+        } catch (IOException e) {
+          LOG.error("Unable to update container resource in store", e);
+        }
+      }
+      break;
     default:
         throw new YarnRuntimeException(
             "Got an unknown ContainerManagerEvent type: " + event.getType());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
new file mode 100644
index 0000000..3944a3d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ChangeContainerResourceEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class ChangeContainerResourceEvent extends ContainerEvent {
+
+  private Resource resource;
+
+  public ChangeContainerResourceEvent(ContainerId c, Resource resource) {
+    super(c, ContainerEventType.CHANGE_CONTAINER_RESOURCE);
+    this.resource = resource;
+  }
+
+  public Resource getResource() {
+    return this.resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 5622f8c..dc712bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -25,6 +25,10 @@ public enum ContainerEventType {
   KILL_CONTAINER,
   UPDATE_DIAGNOSTICS_MSG,
   CONTAINER_DONE,
+  CHANGE_CONTAINER_RESOURCE,
+
+  // Producer: ContainerMonitor
+  CONTAINER_RESOURCE_CHANGED,
 
   // DownloadManager
   CONTAINER_INITED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index f872a55..963e7c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -194,8 +194,10 @@ public class DummyContainerManager extends 
ContainerManagerImpl {
   }
   
   @Override
-  protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
-      ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
+  protected void authorizeStartAndResourceIncreaseRequest(
+      NMTokenIdentifier nmTokenIdentifier,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      boolean startRequest) throws YarnException {
     // do nothing
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index a47e7f7..9a05278 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -189,6 +189,28 @@ public class TestContainerManagerWithLCE extends 
TestContainerManager {
     super.testStartContainerFailureWithUnknownAuxService();
   }
 
+  @Override
+  public void testIncreaseContainerResourceWithInvalidRequests() throws 
Exception {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testIncreaseContainerResourceWithInvalidRequests");
+    super.testIncreaseContainerResourceWithInvalidRequests();
+  }
+
+  @Override
+  public void testIncreaseContainerResourceWithInvalidResource() throws 
Exception {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testIncreaseContainerResourceWithInvalidResource");
+    super.testIncreaseContainerResourceWithInvalidResource();
+  }
+
   private boolean shouldRunTest() {
     return System
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != 
null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index a8e723d..5fc6d89 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -210,12 +210,13 @@ public abstract class BaseContainerManagerTest {
         // do nothing
       }
       @Override
-        protected void authorizeStartRequest(
-            NMTokenIdentifier nmTokenIdentifier,
-            ContainerTokenIdentifier containerTokenIdentifier) throws 
YarnException {
-          // do nothing
-        }
-      
+      protected void authorizeStartAndResourceIncreaseRequest(
+          NMTokenIdentifier nmTokenIdentifier,
+          ContainerTokenIdentifier containerTokenIdentifier,
+          boolean startRequest) throws YarnException {
+        // do nothing
+      }
+
       @Override
         protected void updateNMTokenIdentifier(
             NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
@@ -311,4 +312,34 @@ public abstract class BaseContainerManagerTest {
         app.getApplicationState().equals(finalState));
   }
 
+  public static void waitForNMContainerState(ContainerManagerImpl
+      containerManager, ContainerId containerID,
+          org.apache.hadoop.yarn.server.nodemanager.containermanager
+              .container.ContainerState finalState)
+                  throws InterruptedException, YarnException, IOException {
+    waitForNMContainerState(containerManager, containerID, finalState, 20);
+  }
+
+  public static void waitForNMContainerState(ContainerManagerImpl
+      containerManager, ContainerId containerID,
+          org.apache.hadoop.yarn.server.nodemanager.containermanager
+          .container.ContainerState finalState, int timeOutMax)
+              throws InterruptedException, YarnException, IOException {
+    Container container =
+        containerManager.getContext().getContainers().get(containerID);
+    org.apache.hadoop.yarn.server.nodemanager
+        .containermanager.container.ContainerState currentState =
+            container.getContainerState();
+    int timeoutSecs = 0;
+    while (!currentState.equals(finalState)
+        && timeoutSecs++ < timeOutMax) {
+      Thread.sleep(1000);
+      LOG.info("Waiting for NM container to get into state " + finalState
+          + ". Current state is " + currentState);
+      currentState = container.getContainerState();
+    }
+    LOG.info("Container state is " + currentState);
+    Assert.assertEquals("ContainerState is not correct (timedout)",
+        finalState, currentState);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2934ff34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 7bdfdfb..853c789 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.Shell;
+import 
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -72,6 +74,7 @@ import 
org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
@@ -88,6 +91,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestContainerManager extends BaseContainerManagerTest {
 
   public TestContainerManager() throws UnsupportedFileSystemException {
@@ -804,7 +809,8 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
         metrics, new ApplicationACLsManager(conf), dirsHandler);
     String strExceptionMsg = "";
     try {
-      cMgrImpl.authorizeStartRequest(null, new ContainerTokenIdentifier());
+      cMgrImpl.authorizeStartAndResourceIncreaseRequest(
+          null, new ContainerTokenIdentifier(), true);
     } catch(YarnException ye) {
       strExceptionMsg = ye.getMessage();
     }
@@ -813,7 +819,8 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
 
     strExceptionMsg = "";
     try {
-      cMgrImpl.authorizeStartRequest(new NMTokenIdentifier(), null);
+      cMgrImpl.authorizeStartAndResourceIncreaseRequest(
+          new NMTokenIdentifier(), null, true);
     } catch(YarnException ye) {
       strExceptionMsg = ye.getMessage();
     }
@@ -879,6 +886,167 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
         ContainerManagerImpl.INVALID_CONTAINERTOKEN_MSG);
   }
 
+  @Test
+  public void testIncreaseContainerResourceWithInvalidRequests() throws 
Exception {
+    containerManager.start();
+    // Start 4 containers 0..4 with default resource (1024, 1)
+    List<StartContainerRequest> list = new ArrayList<>();
+    ContainerLaunchContext containerLaunchContext = recordFactory
+        .newRecordInstance(ContainerLaunchContext.class);
+    for (int i = 0; i < 4; i++) {
+      ContainerId cId = createContainerId(i);
+      long identifier = DUMMY_RM_IDENTIFIER;
+      Token containerToken = createContainerToken(cId, identifier,
+          context.getNodeId(), user, context.getContainerTokenSecretManager());
+      StartContainerRequest request = StartContainerRequest.newInstance(
+          containerLaunchContext, containerToken);
+      list.add(request);
+    }
+    StartContainersRequest requestList = StartContainersRequest
+        .newInstance(list);
+    StartContainersResponse response = containerManager
+        .startContainers(requestList);
+
+    Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size());
+    int i = 0;
+    for (ContainerId id : response.getSuccessfullyStartedContainers()) {
+      Assert.assertEquals(i, id.getContainerId());
+      i++;
+    }
+
+    Thread.sleep(2000);
+    // Construct container resource increase request,
+    List<Token> increaseTokens = new ArrayList<Token>();
+    // Add increase request for container-0, the request will fail as the
+    // container will have exited, and won't be in RUNNING state
+    ContainerId cId0 = createContainerId(0);
+    Token containerToken =
+        createContainerToken(cId0, DUMMY_RM_IDENTIFIER,
+            context.getNodeId(), user,
+                Resource.newInstance(1234, 3),
+                    context.getContainerTokenSecretManager(), null);
+    increaseTokens.add(containerToken);
+    // Add increase request for container-7, the request will fail as the
+    // container does not exist
+    ContainerId cId7 = createContainerId(7);
+    containerToken =
+        createContainerToken(cId7, DUMMY_RM_IDENTIFIER,
+            context.getNodeId(), user,
+            Resource.newInstance(1234, 3),
+            context.getContainerTokenSecretManager(), null);
+    increaseTokens.add(containerToken);
+
+    IncreaseContainersResourceRequest increaseRequest =
+        IncreaseContainersResourceRequest
+          .newInstance(increaseTokens);
+    IncreaseContainersResourceResponse increaseResponse =
+        containerManager.increaseContainersResource(increaseRequest);
+    // Check response
+    Assert.assertEquals(
+        0, increaseResponse.getSuccessfullyIncreasedContainers().size());
+    Assert.assertEquals(2, increaseResponse.getFailedRequests().size());
+    for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
+        .getFailedRequests().entrySet()) {
+      Assert.assertNotNull("Failed message", entry.getValue().getMessage());
+      if (cId0.equals(entry.getKey())) {
+        Assert.assertTrue(entry.getValue().getMessage()
+          .contains("Resource can only be changed when a "
+              + "container is in RUNNING state"));
+      } else if (cId7.equals(entry.getKey())) {
+        Assert.assertTrue(entry.getValue().getMessage()
+            .contains("Container " + cId7.toString()
+                + " is not handled by this NodeManager"));
+      } else {
+        throw new YarnException("Received failed request from wrong"
+            + " container: " + entry.getKey().toString());
+      }
+    }
+  }
+
+  @Test
+  public void testIncreaseContainerResourceWithInvalidResource() throws 
Exception {
+    containerManager.start();
+    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+    PrintWriter fileWriter = new PrintWriter(scriptFile);
+    // Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    if (Shell.WINDOWS) {
+      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+    } else {
+      fileWriter.write("\numask 0");
+      fileWriter.write("\nexec sleep 100");
+    }
+    fileWriter.close();
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    URL resource_alpha =
+        ConverterUtils.getYarnUrlFromPath(localFS
+            .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    LocalResource rsrc_alpha =
+        recordFactory.newRecordInstance(LocalResource.class);
+    rsrc_alpha.setResource(resource_alpha);
+    rsrc_alpha.setSize(-1);
+    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc_alpha.setType(LocalResourceType.FILE);
+    rsrc_alpha.setTimestamp(scriptFile.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put(destinationFile, rsrc_alpha);
+    containerLaunchContext.setLocalResources(localResources);
+    List<String> commands =
+        Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+    containerLaunchContext.setCommands(commands);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(
+            containerLaunchContext,
+            createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+    // Make sure the container reaches RUNNING state
+    BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+        org.apache.hadoop.yarn.server.nodemanager.
+            containermanager.container.ContainerState.RUNNING);
+    // Construct container resource increase request,
+    List<Token> increaseTokens = new ArrayList<Token>();
+    // Add increase request. The increase request should fail
+    // as the current resource does not fit in the target resource
+    Token containerToken =
+        createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+            context.getNodeId(), user,
+            Resource.newInstance(512, 1),
+            context.getContainerTokenSecretManager(), null);
+    increaseTokens.add(containerToken);
+    IncreaseContainersResourceRequest increaseRequest =
+        IncreaseContainersResourceRequest
+            .newInstance(increaseTokens);
+    IncreaseContainersResourceResponse increaseResponse =
+        containerManager.increaseContainersResource(increaseRequest);
+    // Check response
+    Assert.assertEquals(
+        0, increaseResponse.getSuccessfullyIncreasedContainers().size());
+    Assert.assertEquals(1, increaseResponse.getFailedRequests().size());
+    for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
+        .getFailedRequests().entrySet()) {
+      if (cId.equals(entry.getKey())) {
+        Assert.assertNotNull("Failed message", entry.getValue().getMessage());
+        Assert.assertTrue(entry.getValue().getMessage()
+            .contains("The target resource "
+                + Resource.newInstance(512, 1).toString()
+                + " is smaller than the current resource "
+                + Resource.newInstance(1024, 1)));
+      } else {
+        throw new YarnException("Received failed request from wrong"
+            + " container: " + entry.getKey().toString());
+      }
+    }
+  }
+
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
       NodeId nodeId, String user,
       NMContainerTokenSecretManager containerTokenSecretManager)
@@ -893,15 +1061,21 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
       LogAggregationContext logAggregationContext)
       throws IOException {
     Resource r = BuilderUtils.newResource(1024, 1);
+    return createContainerToken(cId, rmIdentifier, nodeId, user, r,
+        containerTokenSecretManager, logAggregationContext);
+  }
+
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext)
+      throws IOException {
     ContainerTokenIdentifier containerTokenIdentifier =
-        new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
+        new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
           System.currentTimeMillis() + 100000L, 123, rmIdentifier,
           Priority.newInstance(0), 0, logAggregationContext, null);
-    Token containerToken =
-        BuilderUtils
-          .newContainerToken(nodeId, containerTokenSecretManager
-            .retrievePassword(containerTokenIdentifier),
+    return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
+        .retrievePassword(containerTokenIdentifier),
             containerTokenIdentifier);
-    return containerToken;
   }
 }

Reply via email to