YARN-5620. Core changes in NodeManager to support re-initialization of 
Containers with new launchContext. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/40b5a59b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/40b5a59b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/40b5a59b

Branch: refs/heads/YARN-2915
Commit: 40b5a59b726733df456330a26f03d5174cc0bc1c
Parents: 2a8f55a
Author: Arun Suresh <asur...@apache.org>
Authored: Thu Sep 15 07:15:11 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Thu Sep 15 07:15:11 2016 -0700

----------------------------------------------------------------------
 .../nodemanager/DefaultContainerExecutor.java   |   2 +-
 .../containermanager/ContainerManagerImpl.java  |  51 +++-
 .../containermanager/container/Container.java   |   6 +
 .../container/ContainerEventType.java           |   3 +-
 .../container/ContainerImpl.java                | 274 ++++++++++++++++---
 .../container/ContainerReInitEvent.java         |  62 +++++
 .../container/ContainerState.java               |   2 +-
 .../launcher/ContainersLauncher.java            |   1 +
 .../launcher/ContainersLauncherEventType.java   |   1 +
 .../localizer/ResourceLocalizationService.java  |   3 +-
 .../containermanager/localizer/ResourceSet.java |  43 ++-
 .../ContainerLocalizationRequestEvent.java      |   4 +-
 .../TestContainerManagerWithLCE.java            |  36 +++
 .../BaseContainerManagerTest.java               |   6 +-
 .../containermanager/TestContainerManager.java  | 238 +++++++++++++++-
 .../nodemanager/webapp/MockContainer.java       |  15 +
 16 files changed, 682 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 9a0549d..59b69ac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -89,7 +89,7 @@ public class DefaultContainerExecutor extends 
ContainerExecutor {
   }
 
   protected void copyFile(Path src, Path dst, String owner) throws IOException 
{
-    lfs.util().copy(src, dst);
+    lfs.util().copy(src, dst, false, true);
   }
   
   protected void setScriptExecutable(Path script, String owner) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 52d8566..f909ca5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -110,11 +110,13 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerReInitEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -163,6 +165,9 @@ import static 
org.apache.hadoop.service.Service.STATE.STARTED;
 public class ContainerManagerImpl extends CompositeService implements
     ContainerManager {
 
+  private enum ReinitOp {
+    UPGRADE, COMMIT, ROLLBACK, LOCALIZE;
+  }
   /**
    * Extra duration to wait for applications to be killed on shutdown.
    */
@@ -1529,18 +1534,8 @@ public class ContainerManagerImpl extends 
CompositeService implements
       ResourceLocalizationRequest request) throws YarnException, IOException {
 
     ContainerId containerId = request.getContainerId();
-    Container container = context.getContainers().get(containerId);
-    if (container == null) {
-      throw new YarnException("Specified " + containerId + " does not exist!");
-    }
-    if (!container.getContainerState()
-        .equals(org.apache.hadoop.yarn.server.nodemanager.
-            containermanager.container.ContainerState.RUNNING)) {
-      throw new YarnException(
-          containerId + " is at " + container.getContainerState()
-              + " state. Not able to localize new resources.");
-    }
-
+    Container container = preUpgradeOrLocalizeCheck(containerId,
+        ReinitOp.LOCALIZE);
     try {
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
           container.getResourceSet().addResources(request.getLocalResources());
@@ -1556,6 +1551,38 @@ public class ContainerManagerImpl extends 
CompositeService implements
     return ResourceLocalizationResponse.newInstance();
   }
 
+  public void upgradeContainer(ContainerId containerId,
+      ContainerLaunchContext upgradeLaunchContext) throws YarnException {
+    Container container = preUpgradeOrLocalizeCheck(containerId,
+        ReinitOp.UPGRADE);
+    ResourceSet resourceSet = new ResourceSet();
+    try {
+      resourceSet.addResources(upgradeLaunchContext.getLocalResources());
+      dispatcher.getEventHandler().handle(
+          new ContainerReInitEvent(containerId, upgradeLaunchContext,
+              resourceSet));
+      container.setIsReInitializing(true);
+    } catch (URISyntaxException e) {
+      LOG.info("Error when parsing local resource URI for upgrade of" +
+          "Container [" + containerId + "]", e);
+      throw new YarnException(e);
+    }
+  }
+
+  private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
+      ReinitOp op) throws YarnException {
+    Container container = context.getContainers().get(containerId);
+    if (container == null) {
+      throw new YarnException("Specified " + containerId + " does not exist!");
+    }
+    if (!container.isRunning() || container.isReInitializing()) {
+      throw new YarnException("Cannot perform " + op + " on [" + containerId
+          + "]. Current state is [" + container.getContainerState() + ", " +
+          "isReInitializing=" + container.isReInitializing() + "].");
+    }
+    return container;
+  }
+
   @SuppressWarnings("unchecked")
   private void internalSignalToContainer(SignalContainerRequest request,
       String sentBy) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index c4cea18..f6c27ab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -76,4 +76,10 @@ public interface Container extends 
EventHandler<ContainerEvent> {
   Priority getPriority();
 
   ResourceSet getResourceSet();
+
+  boolean isRunning();
+
+  void setIsReInitializing(boolean isReInitializing);
+
+  boolean isReInitializing();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 5622f8c..0b57505 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -25,6 +25,7 @@ public enum ContainerEventType {
   KILL_CONTAINER,
   UPDATE_DIAGNOSTICS_MSG,
   CONTAINER_DONE,
+  REINITIALIZE_CONTAINER,
 
   // DownloadManager
   CONTAINER_INITED,
@@ -36,5 +37,5 @@ public enum ContainerEventType {
   CONTAINER_LAUNCHED,
   CONTAINER_EXITED_WITH_SUCCESS,
   CONTAINER_EXITED_WITH_FAILURE,
-  CONTAINER_KILLED_ON_REQUEST,
+  CONTAINER_KILLED_ON_REQUEST
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index ce9e581..12bbea9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -90,13 +91,24 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerImpl implements Container {
 
+  private final static class ReInitializationContext {
+    private final ResourceSet resourceSet;
+    private final ContainerLaunchContext newLaunchContext;
+
+    private ReInitializationContext(ContainerLaunchContext newLaunchContext,
+        ResourceSet resourceSet) {
+      this.newLaunchContext = newLaunchContext;
+      this.resourceSet = resourceSet;
+    }
+  }
+
   private final Lock readLock;
   private final Lock writeLock;
   private final Dispatcher dispatcher;
   private final NMStateStoreService stateStore;
   private final Credentials credentials;
   private final NodeManagerMetrics metrics;
-  private final ContainerLaunchContext launchContext;
+  private volatile ContainerLaunchContext launchContext;
   private final ContainerTokenIdentifier containerTokenIdentifier;
   private final ContainerId containerId;
   private volatile Resource resource;
@@ -110,13 +122,15 @@ public class ContainerImpl implements Container {
   private long containerLaunchStartTime;
   private ContainerMetrics containerMetrics;
   private static Clock clock = SystemClock.getInstance();
-  private final ContainerRetryContext containerRetryContext;
+  private ContainerRetryContext containerRetryContext;
   // remaining retries to relaunch container if needed
   private int remainingRetryAttempts;
   private String workDir;
   private String logDir;
   private String host;
   private String ips;
+  private ReInitializationContext reInitContext;
+  private volatile boolean isReInitializing = false;
 
   /** The NM-wide configuration - not specific to this container */
   private final Configuration daemonConf;
@@ -141,23 +155,7 @@ public class ContainerImpl implements Container {
     this.stateStore = context.getNMStateStore();
     this.version = containerTokenIdentifier.getVersion();
     this.launchContext = launchContext;
-    if (launchContext != null
-        && launchContext.getContainerRetryContext() != null) {
-      this.containerRetryContext = launchContext.getContainerRetryContext();
-    } else {
-      this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
-    }
-    this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
-    int minimumRestartInterval = conf.getInt(
-        YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
-    if (containerRetryContext.getRetryPolicy()
-        != ContainerRetryPolicy.NEVER_RETRY
-        && containerRetryContext.getRetryInterval() < minimumRestartInterval) {
-      LOG.info("Set restart interval to minimum value " + 
minimumRestartInterval
-          + "ms for container " + containerTokenIdentifier.getContainerID());
-      this.containerRetryContext.setRetryInterval(minimumRestartInterval);
-    }
+
     this.diagnosticsMaxSize = conf.getInt(
         YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
         YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
@@ -188,11 +186,37 @@ public class ContainerImpl implements Container {
       containerMetrics.recordStartTime(clock.getTime());
     }
 
+    // Configure the Retry Context
+    this.containerRetryContext =
+        configureRetryContext(conf, launchContext, this.containerId);
+    this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
     stateMachine = stateMachineFactory.make(this);
     this.context = context;
     this.resourceSet = new ResourceSet();
   }
 
+  private static ContainerRetryContext configureRetryContext(
+      Configuration conf, ContainerLaunchContext launchContext,
+      ContainerId containerId) {
+    ContainerRetryContext context;
+    if (launchContext != null
+        && launchContext.getContainerRetryContext() != null) {
+      context = launchContext.getContainerRetryContext();
+    } else {
+      context = ContainerRetryContext.NEVER_RETRY_CONTEXT;
+    }
+    int minimumRestartInterval = conf.getInt(
+        YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
+    if (context.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY
+        && context.getRetryInterval() < minimumRestartInterval) {
+      LOG.info("Set restart interval to minimum value " + 
minimumRestartInterval
+          + "ms for container " + containerId);
+      context.setRetryInterval(minimumRestartInterval);
+    }
+    return context;
+  }
+
   // constructor for a recovered container
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
@@ -299,6 +323,9 @@ public class ContainerImpl implements Container {
             ContainerState.EXITED_WITH_FAILURE),
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new RetryFailureTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+        ContainerEventType.REINITIALIZE_CONTAINER,
+        new ReInitializeContainerTransition())
     .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
         ContainerEventType.RESOURCE_LOCALIZED,
         new ResourceLocalizedWhileRunningTransition())
@@ -310,10 +337,38 @@ public class ContainerImpl implements Container {
        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
-    .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
+    .addTransition(ContainerState.RUNNING,
+        ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledExternallyTransition())
 
+    // From REINITIALIZING State
+    .addTransition(ContainerState.REINITIALIZING,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition(true))
+    .addTransition(ContainerState.REINITIALIZING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.REINITIALIZING,
+        ContainerState.REINITIALIZING,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new ResourceLocalizedWhileReInitTransition())
+    .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
+        ContainerEventType.RESOURCE_FAILED,
+        new ResourceLocalizationFailedWhileReInitTransition())
+    .addTransition(ContainerState.REINITIALIZING,
+        ContainerState.REINITIALIZING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.REINITIALIZING,
+        ContainerState.LOCALIZED,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledForReInitializationTransition())
+
     // From RELAUNCHING State
     .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
         ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
@@ -458,7 +513,7 @@ public class ContainerImpl implements Container {
   }
 
   @Override
-  public Map<Path,List<String>> getLocalizedResources() {
+  public Map<Path, List<String>> getLocalizedResources() {
     this.readLock.lock();
     try {
       if (ContainerState.LOCALIZED == getContainerState()
@@ -775,7 +830,7 @@ public class ContainerImpl implements Container {
       ContainerResourceLocalizedEvent rsrcEvent = 
(ContainerResourceLocalizedEvent) event;
       LocalResourceRequest resourceRequest = rsrcEvent.getResource();
       Path location = rsrcEvent.getLocation();
-      List<String> syms =
+      Set<String> syms =
           container.resourceSet.resourceLocalized(resourceRequest, location);
       if (null == syms) {
         LOG.info("Localized resource " + resourceRequest +
@@ -822,17 +877,86 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Resource is localized while the container is running - create symlinks
+   * Transition to start the Re-Initialization process.
+   */
+  static class ReInitializeContainerTransition extends ContainerTransition {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      container.reInitContext = createReInitContext(event);
+      try {
+        Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+            pendingResources =
+            container.reInitContext.resourceSet.getAllResourcesByVisibility();
+        if (!pendingResources.isEmpty()) {
+          container.dispatcher.getEventHandler().handle(
+              new ContainerLocalizationRequestEvent(
+                  container, pendingResources));
+        } else {
+          // We are not waiting on any resources, so...
+          // Kill the current container.
+          container.dispatcher.getEventHandler().handle(
+              new ContainersLauncherEvent(container,
+                  ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+        }
+      } catch (Exception e) {
+        LOG.error("Container [" + container.getContainerId() + "]" +
+            " re-initialization failure..", e);
+        container.addDiagnostics("Error re-initializing due to" +
+            "[" + e.getMessage() + "]");
+      }
+    }
+
+    protected ReInitializationContext createReInitContext(
+        ContainerEvent event) {
+      ContainerReInitEvent rEvent = (ContainerReInitEvent)event;
+      return new ReInitializationContext(rEvent.getReInitLaunchContext(),
+          rEvent.getResourceSet());
+    }
+  }
+
+  /**
+   * Resource requested for Container Re-initialization has been localized.
+   * If all dependencies are met, then restart Container with new bits.
+   */
+  static class ResourceLocalizedWhileReInitTransition
+      extends ContainerTransition {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      ContainerResourceLocalizedEvent rsrcEvent =
+          (ContainerResourceLocalizedEvent) event;
+      container.reInitContext.resourceSet.resourceLocalized(
+          rsrcEvent.getResource(), rsrcEvent.getLocation());
+      // Check if all ResourceLocalization has completed
+      if (container.reInitContext.resourceSet.getPendingResources()
+          .isEmpty()) {
+        // Kill the current container.
+        container.dispatcher.getEventHandler().handle(
+            new ContainersLauncherEvent(container,
+                ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+      }
+    }
+  }
+
+  /**
+   * Resource is localized while the container is running - create symlinks.
    */
   static class ResourceLocalizedWhileRunningTransition
       extends ContainerTransition {
 
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerResourceLocalizedEvent rsrcEvent =
           (ContainerResourceLocalizedEvent) event;
-      List<String> links = container.resourceSet
-          .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
+      Set<String> links = container.resourceSet.resourceLocalized(
+          rsrcEvent.getResource(), rsrcEvent.getLocation());
+      if (links == null) {
+        return;
+      }
       // creating symlinks.
       for (String link : links) {
         try {
@@ -872,8 +996,29 @@ public class ContainerImpl implements Container {
   }
 
   /**
+   * Resource localization failed while the container is reinitializing.
+   */
+  static class ResourceLocalizationFailedWhileReInitTransition
+      extends ContainerTransition {
+
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      ContainerResourceFailedEvent failedEvent =
+          (ContainerResourceFailedEvent) event;
+      container.resourceSet.resourceLocalizationFailed(
+          failedEvent.getResource());
+      container.addDiagnostics("Container aborting re-initialization.. "
+          + failedEvent.getDiagnosticMessage());
+      LOG.error("Container [" + container.getContainerId() + "] Re-init" +
+          " failed !! Resource [" + failedEvent.getResource() + "] could" +
+          " not be localized !!");
+      container.reInitContext = null;
+    }
+  }
+
+  /**
    * Transition from LOCALIZED state to RUNNING state upon receiving
-   * a CONTAINER_LAUNCHED event
+   * a CONTAINER_LAUNCHED event.
    */
   static class LaunchTransition extends ContainerTransition {
     @SuppressWarnings("unchecked")
@@ -883,6 +1028,12 @@ public class ContainerImpl implements Container {
       container.metrics.runningContainer();
       container.wasLaunched  = true;
 
+      if (container.reInitContext != null) {
+        container.reInitContext = null;
+        // Set rollback context here..
+        container.setIsReInitializing(false);
+      }
+
       if (container.recoveredAsKilled) {
         LOG.info("Killing " + container.containerId
             + " due to recovered as killed");
@@ -895,8 +1046,8 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
-   * upon EXITED_WITH_SUCCESS message.
+   * Transition from RUNNING or KILLING state to
+   * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.
    */
   @SuppressWarnings("unchecked")  // dispatcher not typed
   static class ExitedWithSuccessTransition extends ContainerTransition {
@@ -909,6 +1060,8 @@ public class ContainerImpl implements Container {
 
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
+
+      container.setIsReInitializing(false);
       // Set exit code to 0 on success         
       container.exitCode = 0;
        
@@ -939,6 +1092,7 @@ public class ContainerImpl implements Container {
 
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
+      container.setIsReInitializing(false);
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
       container.exitCode = exitEvent.getExitCode();
       if (exitEvent.getDiagnosticInfo() != null) {
@@ -959,7 +1113,7 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
+   * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon
    * CONTAINER_EXITED_WITH_FAILURE state.
    **/
   @SuppressWarnings("unchecked")  // dispatcher not typed
@@ -991,7 +1145,7 @@ public class ContainerImpl implements Container {
           } catch (IOException e) {
             LOG.warn(
                 "Unable to update remainingRetryAttempts in state store for "
-                + container.getContainerId(), e);
+                    + container.getContainerId(), e);
           }
         }
         LOG.info("Relaunching Container " + container.getContainerId()
@@ -1053,7 +1207,7 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
+   * Transition to EXITED_WITH_FAILURE
    */
   static class KilledExternallyTransition extends ExitedWithFailureTransition {
     KilledExternallyTransition() {
@@ -1061,13 +1215,44 @@ public class ContainerImpl implements Container {
     }
 
     @Override
-    public void transition(ContainerImpl container, ContainerEvent event) {
+    public void transition(ContainerImpl container,
+        ContainerEvent event) {
       super.transition(container, event);
       container.addDiagnostics("Killed by external signal\n");
     }
   }
 
   /**
+   * Transition to LOCALIZED and wait for RE-LAUNCH
+   */
+  static class KilledForReInitializationTransition extends ContainerTransition 
{
+
+    @Override
+    public void transition(ContainerImpl container,
+        ContainerEvent event) {
+      LOG.info("Relaunching Container [" + container.getContainerId()
+          + "] for upgrade !!");
+      container.wasLaunched  = false;
+      container.metrics.endRunningContainer();
+
+      container.launchContext = container.reInitContext.newLaunchContext;
+
+      // Re configure the Retry Context
+      container.containerRetryContext =
+          configureRetryContext(container.context.getConf(),
+          container.launchContext, container.containerId);
+      // Reset the retry attempts since its a fresh start
+      container.remainingRetryAttempts =
+          container.containerRetryContext.getMaxRetries();
+
+      container.resourceSet = ResourceSet.merge(
+          container.resourceSet, container.reInitContext.resourceSet);
+
+      container.sendLaunchEvent();
+    }
+  }
+
+  /**
    * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
    * RESOURCE_FAILED event.
    */
@@ -1122,16 +1307,20 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Transitions upon receiving KILL_CONTAINER:
-   * - LOCALIZED -> KILLING
-   * - RUNNING -> KILLING
+   * Transitions upon receiving KILL_CONTAINER.
+   * - LOCALIZED -> KILLING.
+   * - RUNNING -> KILLING.
+   * - REINITIALIZING -> KILLING.
    */
   @SuppressWarnings("unchecked") // dispatcher not typed
   static class KillTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       // Kill the process/process-grp
+      container.setIsReInitializing(false);
       container.dispatcher.getEventHandler().handle(
           new ContainersLauncherEvent(container,
               ContainersLauncherEventType.CLEANUP_CONTAINER));
@@ -1385,4 +1574,19 @@ public class ContainerImpl implements Container {
   public Priority getPriority() {
     return containerTokenIdentifier.getPriority();
   }
+
+  @Override
+  public boolean isRunning() {
+    return getContainerState() == ContainerState.RUNNING;
+  }
+
+  @Override
+  public void setIsReInitializing(boolean isReInitializing) {
+    this.isReInitializing = isReInitializing;
+  }
+
+  @Override
+  public boolean isReInitializing() {
+    return this.isReInitializing;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
new file mode 100644
index 0000000..2ccdbd7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
+
+/**
+ * ContainerEvent sent by ContainerManager to ContainerImpl to
+ * re-initiate Container.
+ */
+public class ContainerReInitEvent extends ContainerEvent {
+
+  private final ContainerLaunchContext reInitLaunchContext;
+  private final ResourceSet resourceSet;
+
+  /**
+   * Container Re-Init Event.
+   * @param cID Container Id
+   * @param upgradeContext Upgrade context
+   * @param resourceSet Resource Set
+   */
+  public ContainerReInitEvent(ContainerId cID,
+      ContainerLaunchContext upgradeContext, ResourceSet resourceSet){
+    super(cID, ContainerEventType.REINITIALIZE_CONTAINER);
+    this.reInitLaunchContext = upgradeContext;
+    this.resourceSet = resourceSet;
+  }
+
+  /**
+   * Get the Launch Context to be used for upgrade.
+   * @return ContainerLaunchContext
+   */
+  public ContainerLaunchContext getReInitLaunchContext() {
+    return reInitLaunchContext;
+  }
+
+  /**
+   * Get the ResourceSet.
+   * @return ResourceSet.
+   */
+  public ResourceSet getResourceSet() {
+    return resourceSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 6b96204..70de90c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -20,6 +20,6 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
 public enum ContainerState {
   NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
-  EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
+  REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
   CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index e5fff00..d4a7bfd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -137,6 +137,7 @@ public class ContainersLauncher extends AbstractService
         running.put(containerId, launch);
         break;
       case CLEANUP_CONTAINER:
+      case CLEANUP_CONTAINER_FOR_REINIT:
         ContainerLaunch launcher = running.remove(containerId);
         if (launcher == null) {
           // Container not launched. So nothing needs to be done.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 2d7bc74..380a032 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -23,5 +23,6 @@ public enum ContainersLauncherEventType {
   RELAUNCH_CONTAINER,
   RECOVER_CONTAINER,
   CLEANUP_CONTAINER, // The process(grp) itself.
+  CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
   SIGNAL_CONTAINER,
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index b281ef5..2cf6ee9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -470,7 +470,8 @@ public class ResourceLocalizationService extends 
CompositeService
       ContainerLocalizationRequestEvent rsrcReqs) {
     Container c = rsrcReqs.getContainer();
     EnumSet<ContainerState> set =
-        EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
+        EnumSet.of(ContainerState.LOCALIZING,
+            ContainerState.RUNNING, ContainerState.REINITIALIZING);
     if (!set.contains(c.getContainerState())) {
       LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
           + " state, do not localize resources.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
index a41ee20..5da3abc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
@@ -43,9 +43,9 @@ public class ResourceSet {
   private static final Log LOG = LogFactory.getLog(ResourceSet.class);
 
   // resources by localization state (localized, pending, failed)
-  private Map<Path, List<String>> localizedResources =
+  private Map<String, Path> localizedResources =
       new ConcurrentHashMap<>();
-  private Map<LocalResourceRequest, List<String>> pendingResources =
+  private Map<LocalResourceRequest, Set<String>> pendingResources =
       new ConcurrentHashMap<>();
   private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
       new HashSet<>();
@@ -69,7 +69,7 @@ public class ResourceSet {
     if (localResourceMap == null || localResourceMap.isEmpty()) {
       return null;
     }
-    Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
+    Map<LocalResourceRequest, Set<String>> allResources = new HashMap<>();
     List<LocalResourceRequest> publicList = new ArrayList<>();
     List<LocalResourceRequest> privateList = new ArrayList<>();
     List<LocalResourceRequest> appList = new ArrayList<>();
@@ -77,7 +77,7 @@ public class ResourceSet {
     for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
       LocalResource resource = rsrc.getValue();
       LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
-      allResources.putIfAbsent(req, new ArrayList<>());
+      allResources.putIfAbsent(req, new HashSet<>());
       allResources.get(req).add(rsrc.getKey());
       storeSharedCacheUploadPolicy(req,
           resource.getShouldBeUploadedToSharedCache());
@@ -121,13 +121,15 @@ public class ResourceSet {
    * @param location The path where the resource is localized
    * @return The list of symlinks for the localized resources.
    */
-  public List<String> resourceLocalized(LocalResourceRequest request,
+  public Set<String> resourceLocalized(LocalResourceRequest request,
       Path location) {
-    List<String> symlinks = pendingResources.remove(request);
+    Set<String> symlinks = pendingResources.remove(request);
     if (symlinks == null) {
       return null;
     } else {
-      localizedResources.put(location, symlinks);
+      for (String symlink : symlinks) {
+        localizedResources.put(symlink, location);
+      }
       return symlinks;
     }
   }
@@ -175,7 +177,12 @@ public class ResourceSet {
   }
 
   public Map<Path, List<String>> getLocalizedResources() {
-    return localizedResources;
+    Map<Path, List<String>> map = new HashMap<>();
+    for (Map.Entry<String, Path> entry : localizedResources.entrySet()) {
+      map.putIfAbsent(entry.getValue(), new ArrayList<>());
+      map.get(entry.getValue()).add(entry.getKey());
+    }
+    return map;
   }
 
   public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
@@ -186,7 +193,25 @@ public class ResourceSet {
     return resourcesUploadPolicies;
   }
 
-  public Map<LocalResourceRequest, List<String>> getPendingResources() {
+  public Map<LocalResourceRequest, Set<String>> getPendingResources() {
     return pendingResources;
   }
+
+  public static ResourceSet merge(ResourceSet... resourceSets) {
+    ResourceSet merged = new ResourceSet();
+    for (ResourceSet rs : resourceSets) {
+      // This should overwrite existing symlinks
+      merged.localizedResources.putAll(rs.localizedResources);
+
+      merged.resourcesToBeUploaded.putAll(rs.resourcesToBeUploaded);
+      merged.resourcesUploadPolicies.putAll(rs.resourcesUploadPolicies);
+
+      // TODO : START : Should we de-dup here ?
+      merged.publicRsrcs.addAll(rs.publicRsrcs);
+      merged.privateRsrcs.addAll(rs.privateRsrcs);
+      merged.appRsrcs.addAll(rs.appRsrcs);
+      // TODO : END
+    }
+    return merged;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
index 43a2f33..0344275 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
@@ -39,8 +39,8 @@ public class ContainerLocalizationRequestEvent extends
 
   /**
    * Event requesting the localization of the rsrc.
-   * @param c
-   * @param rsrc
+   * @param c Container
+   * @param rsrc LocalResourceRequests map
    */
   public ContainerLocalizationRequestEvent(Container c,
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index aa0d975..8a27849 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -269,6 +269,42 @@ public class TestContainerManagerWithLCE extends 
TestContainerManager {
     super.testForcefulShutdownSignal();
   }
 
+  @Override
+  public void testContainerUpgradeSuccess() throws IOException,
+      InterruptedException, YarnException {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerUpgradeSuccess");
+    super.testContainerUpgradeSuccess();
+  }
+
+  @Override
+  public void testContainerUpgradeLocalizationFailure() throws IOException,
+      InterruptedException, YarnException {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerUpgradeLocalizationFailure");
+    super.testContainerUpgradeLocalizationFailure();
+  }
+
+  @Override
+  public void testContainerUpgradeProcessFailure() throws IOException,
+      InterruptedException, YarnException {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerUpgradeProcessFailure");
+    super.testContainerUpgradeProcessFailure();
+  }
+
   private boolean shouldRunTest() {
     return System
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != 
null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index ec38501..d359c3d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -123,7 +123,11 @@ public abstract class BaseContainerManagerTest {
       conf) {
     public int getHttpPort() {
       return HTTP_PORT;
-    };
+    }
+    @Override
+    public ContainerExecutor getContainerExecutor() {
+      return exec;
+    }
   };
   protected ContainerExecutor exec;
   protected DeletionService delSrvc;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 5785e1f..843dc2a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -33,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -64,6 +66,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -94,7 +98,6 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -366,6 +369,237 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
       DefaultContainerExecutor.containerIsAlive(pid));
   }
 
+  @Test
+  public void testContainerUpgradeSuccess() throws IOException,
+      InterruptedException, YarnException {
+    containerManager.start();
+    // ////// Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+    String pid = prepareInitialContainer(cId, oldStartFile);
+
+    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+    prepareContainerUpgrade(false, false, cId, newStartFile);
+
+    // Assert that the First process is not alive anymore
+    Assert.assertFalse("Process is still alive!",
+        DefaultContainerExecutor.containerIsAlive(pid));
+
+    BufferedReader reader =
+        new BufferedReader(new FileReader(newStartFile));
+    Assert.assertEquals("Upgrade World!", reader.readLine());
+
+    // Get the pid of the process
+    String newPid = reader.readLine().trim();
+    Assert.assertNotEquals("Old and New Pids must be different !", pid, 
newPid);
+    // No more lines
+    Assert.assertEquals(null, reader.readLine());
+
+    reader.close();
+
+    // Verify old file still exists and is accessible by
+    // the new process...
+    reader = new BufferedReader(new FileReader(oldStartFile));
+    Assert.assertEquals("Hello World!", reader.readLine());
+
+    // Assert that the New process is alive
+    Assert.assertTrue("New Process is not alive!",
+        DefaultContainerExecutor.containerIsAlive(newPid));
+  }
+
+  @Test
+  public void testContainerUpgradeLocalizationFailure() throws IOException,
+      InterruptedException, YarnException {
+    if (Shell.WINDOWS) {
+      return;
+    }
+    containerManager.start();
+    // ////// Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+    String pid = prepareInitialContainer(cId, oldStartFile);
+
+    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+    prepareContainerUpgrade(true, true, cId, newStartFile);
+
+    // Assert that the First process is STILL alive
+    // since upgrade was terminated..
+    Assert.assertTrue("Process is NOT alive!",
+        DefaultContainerExecutor.containerIsAlive(pid));
+  }
+
+  @Test
+  public void testContainerUpgradeProcessFailure() throws IOException,
+      InterruptedException, YarnException {
+    if (Shell.WINDOWS) {
+      return;
+    }
+    containerManager.start();
+    // ////// Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+    String pid = prepareInitialContainer(cId, oldStartFile);
+
+    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+    prepareContainerUpgrade(true, false, cId, newStartFile);
+
+    // Assert that the First process is not alive anymore
+    Assert.assertFalse("Process is still alive!",
+        DefaultContainerExecutor.containerIsAlive(pid));
+  }
+
+  /**
+   * Prepare a launch Context for container upgrade and request the
+   * Container Manager to re-initialize a running container using the
+   * new launch context.
+   * @param failCmd injects a start script that intentionally fails.
+   * @param failLoc injects a bad file Location that will fail localization.
+   */
+  private void prepareContainerUpgrade(boolean failCmd, boolean failLoc,
+      ContainerId cId, File startFile)
+      throws FileNotFoundException, YarnException, InterruptedException {
+    // Re-write scriptfile and processStartFile
+    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
+    PrintWriter fileWriter = new PrintWriter(scriptFile);
+
+    writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);
+
+    ContainerLaunchContext containerLaunchContext =
+        prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc);
+
+    containerManager.upgradeContainer(cId, containerLaunchContext);
+    try {
+      containerManager.upgradeContainer(cId, containerLaunchContext);
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE"));
+    }
+    int timeoutSecs = 0;
+    int maxTimeToWait = failLoc ? 10 : 20;
+    // Wait for new processStartfile to be created
+    while (!startFile.exists() && timeoutSecs++ < maxTimeToWait) {
+      Thread.sleep(1000);
+      LOG.info("Waiting for New process start-file to be created");
+    }
+  }
+
+  /**
+   * Prepare and start an initial container. This container will be 
subsequently
+   * re-initialized for upgrade. It also waits for the container to start and
+   * returns the Pid of the running container.
+   */
+  private String prepareInitialContainer(ContainerId cId, File startFile)
+      throws IOException, YarnException, InterruptedException {
+    File scriptFileOld = Shell.appendScriptExtension(tmpDir, "scriptFile");
+    PrintWriter fileWriterOld = new PrintWriter(scriptFileOld);
+
+    writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);
+
+    ContainerLaunchContext containerLaunchContext =
+        prepareContainerLaunchContext(scriptFileOld, "dest_file", false);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+            createContainerToken(cId,
+                DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+                context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    int timeoutSecs = 0;
+    while (!startFile.exists() && timeoutSecs++ < 20) {
+      Thread.sleep(1000);
+      LOG.info("Waiting for process start-file to be created");
+    }
+    Assert.assertTrue("ProcessStartFile doesn't exist!",
+        startFile.exists());
+
+    // Now verify the contents of the file
+    BufferedReader reader =
+        new BufferedReader(new FileReader(startFile));
+    Assert.assertEquals("Hello World!", reader.readLine());
+    // Get the pid of the process
+    String pid = reader.readLine().trim();
+    // No more lines
+    Assert.assertEquals(null, reader.readLine());
+
+    // Assert that the process is alive
+    Assert.assertTrue("Process is not alive!",
+        DefaultContainerExecutor.containerIsAlive(pid));
+    // Once more
+    Assert.assertTrue("Process is not alive!",
+        DefaultContainerExecutor.containerIsAlive(pid));
+    return pid;
+  }
+
+  private void writeScriptFile(PrintWriter fileWriter, String startLine,
+      File processStartFile, ContainerId cId, boolean isFailure) {
+    if (Shell.WINDOWS) {
+      fileWriter.println("@echo " + startLine + "> " + processStartFile);
+      fileWriter.println("@echo " + cId + ">> " + processStartFile);
+      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+    } else {
+      fileWriter.write("\numask 0"); // So that start file is readable by test
+      if (isFailure) {
+        // Echo PID and throw some error code
+        fileWriter.write("\necho $$ >> " + processStartFile);
+        fileWriter.write("\nexit 111");
+      } else {
+        fileWriter.write("\necho " + startLine + " > " + processStartFile);
+        fileWriter.write("\necho $$ >> " + processStartFile);
+        fileWriter.write("\nexec sleep 100");
+      }
+    }
+    fileWriter.close();
+  }
+
+  private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
+      String destFName, boolean putBadFile) {
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    URL resourceAlpha = null;
+    if (putBadFile) {
+      File fileToDelete = new File(tmpDir, "fileToDelete")
+          .getAbsoluteFile();
+      resourceAlpha =
+          URL.fromPath(localFS
+              .makeQualified(new Path(fileToDelete.getAbsolutePath())));
+      fileToDelete.delete();
+    } else {
+      resourceAlpha =
+          URL.fromPath(localFS
+              .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    }
+    LocalResource rsrcAlpha =
+        recordFactory.newRecordInstance(LocalResource.class);
+    rsrcAlpha.setResource(resourceAlpha);
+    rsrcAlpha.setSize(-1);
+    rsrcAlpha.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrcAlpha.setType(LocalResourceType.FILE);
+    rsrcAlpha.setTimestamp(scriptFile.lastModified());
+    Map<String, LocalResource> localResources = new HashMap<>();
+    localResources.put(destFName, rsrcAlpha);
+    containerLaunchContext.setLocalResources(localResources);
+
+    ContainerRetryContext containerRetryContext = ContainerRetryContext
+        .newInstance(
+            ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
+            new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0);
+    containerLaunchContext.setContainerRetryContext(containerRetryContext);
+    List<String> commands = Arrays.asList(
+        Shell.getRunScriptCommand(scriptFile));
+    containerLaunchContext.setCommands(commands);
+    return containerLaunchContext;
+  }
+
   protected void testContainerLaunchAndExit(int exitCode) throws IOException,
       InterruptedException, YarnException {
 
@@ -556,7 +790,7 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
       Assert.fail();
     } catch (YarnException e) {
       Assert.assertTrue(
-          e.getMessage().contains("Not able to localize new resources"));
+          e.getMessage().contains("Cannot perform LOCALIZE"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index c176556..8c8bec7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -190,4 +190,19 @@ public class MockContainer implements Container {
   public void setIpAndHost(String[] ipAndHost) {
 
   }
+
+  @Override
+  public boolean isRunning() {
+    return false;
+  }
+
+  @Override
+  public void setIsReInitializing(boolean isReInitializing) {
+
+  }
+
+  @Override
+  public boolean isReInitializing() {
+    return false;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to