YARN-7486. Race condition in service AM that can cause NPE. Contributed by Jian 
He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4d5d202
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4d5d202
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4d5d202

Branch: refs/heads/YARN-6592
Commit: f4d5d20286eb05449f6fd7cd6ff0554228205fe2
Parents: 462e25a
Author: Billie Rinaldi <bil...@apache.org>
Authored: Wed Nov 15 10:20:46 2017 -0800
Committer: Billie Rinaldi <bil...@apache.org>
Committed: Thu Nov 16 07:58:06 2017 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/service/ServiceScheduler.java   |  50 ++++-----
 .../yarn/service/component/Component.java       |  58 ++++------
 .../yarn/service/component/ComponentEvent.java  |  11 ++
 .../component/instance/ComponentInstance.java   |  83 +++++++-------
 .../containerlaunch/ContainerLaunchService.java |   2 +-
 .../provider/AbstractProviderService.java       |   5 +-
 .../yarn/service/provider/ProviderService.java  |   5 +-
 .../yarn/service/provider/ProviderUtils.java    |   5 +-
 .../ServiceTimelinePublisher.java               |   5 +-
 .../hadoop/yarn/service/MockServiceAM.java      |  66 ++++++++---
 .../hadoop/yarn/service/ServiceTestUtils.java   |   5 +-
 .../hadoop/yarn/service/TestServiceAM.java      | 109 +++++++++++++++++++
 .../service/monitor/TestServiceMonitor.java     |  12 ++
 13 files changed, 290 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index a7b7e22..6bc5673 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -132,7 +132,6 @@ public class ServiceScheduler extends CompositeService {
   private AMRMClientAsync<AMRMClient.ContainerRequest> amRMClient;
   private NMClientAsync nmClient;
   private AsyncDispatcher dispatcher;
-  AsyncDispatcher compInstanceDispatcher;
   private YarnRegistryViewForProviders yarnRegistryOperations;
   private ServiceContext context;
   private ContainerLaunchService containerLaunchService;
@@ -152,7 +151,7 @@ public class ServiceScheduler extends CompositeService {
     yarnRegistryOperations =
         createYarnRegistryOperations(context, registryClient);
 
-    // register metrics
+    // register metrics,
     serviceMetrics = ServiceMetrics
         .register(app.getName(), "Metrics for service");
     serviceMetrics.tag("type", "Metrics type [component or service]", 
"service");
@@ -167,14 +166,11 @@ public class ServiceScheduler extends CompositeService {
     dispatcher = new AsyncDispatcher("Component  dispatcher");
     dispatcher.register(ComponentEventType.class,
         new ComponentEventHandler());
+    dispatcher.register(ComponentInstanceEventType.class,
+        new ComponentInstanceEventHandler());
     dispatcher.setDrainEventsOnStop();
     addIfService(dispatcher);
 
-    compInstanceDispatcher =
-        new AsyncDispatcher("CompInstance dispatcher");
-    compInstanceDispatcher.register(ComponentInstanceEventType.class,
-        new ComponentInstanceEventHandler());
-    addIfService(compInstanceDispatcher);
     containerLaunchService = new ContainerLaunchService(context.fs);
     addService(containerLaunchService);
 
@@ -277,10 +273,10 @@ public class ServiceScheduler extends CompositeService {
   }
 
   private void recoverComponents(RegisterApplicationMasterResponse response) {
-    List<Container> recoveredContainers = response
+    List<Container> containersFromPrevAttempt = response
         .getContainersFromPreviousAttempts();
     LOG.info("Received {} containers from previous attempt.",
-        recoveredContainers.size());
+        containersFromPrevAttempt.size());
     Map<String, ServiceRecord> existingRecords = new HashMap<>();
     List<String> existingComps = null;
     try {
@@ -302,9 +298,8 @@ public class ServiceScheduler extends CompositeService {
         }
       }
     }
-    for (Container container : recoveredContainers) {
-      LOG.info("Handling container {} from previous attempt",
-          container.getId());
+    for (Container container : containersFromPrevAttempt) {
+      LOG.info("Handling {} from previous attempt", container.getId());
       ServiceRecord record = existingRecords.get(RegistryPathUtils
           .encodeYarnID(container.getId().toString()));
       if (record != null) {
@@ -487,16 +482,21 @@ public class ServiceScheduler extends CompositeService {
             new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
                 .setContainer(container);
         dispatcher.getEventHandler().handle(event);
-        Collection<AMRMClient.ContainerRequest> requests = amRMClient
-            .getMatchingRequests(container.getAllocationRequestId());
-        LOG.info("[COMPONENT {}]: {} outstanding container requests.",
-            comp.getName(), requests.size());
-        // remove the corresponding request
-        if (requests.iterator().hasNext()) {
-          LOG.info("[COMPONENT {}]: removing one container request.", comp
-              .getName());
-          AMRMClient.ContainerRequest request = requests.iterator().next();
-          amRMClient.removeContainerRequest(request);
+        try {
+          Collection<AMRMClient.ContainerRequest> requests = amRMClient
+              .getMatchingRequests(container.getAllocationRequestId());
+          LOG.info("[COMPONENT {}]: remove {} outstanding container requests " 
+
+                  "for allocateId " + container.getAllocationRequestId(),
+              comp.getName(), requests.size());
+          // remove the corresponding request
+          if (requests.iterator().hasNext()) {
+            AMRMClient.ContainerRequest request = requests.iterator().next();
+            amRMClient.removeContainerRequest(request);
+          }
+        } catch(Exception e) {
+          //TODO Due to YARN-7490, exception may be thrown, catch and ignore 
for
+          //now.
+          LOG.error("Exception when removing the matching requests. ", e);
         }
       }
     }
@@ -569,7 +569,7 @@ public class ServiceScheduler extends CompositeService {
       }
       ComponentEvent event =
           new ComponentEvent(instance.getCompName(), CONTAINER_STARTED)
-              .setInstance(instance);
+              .setInstance(instance).setContainerId(containerId);
       dispatcher.getEventHandler().handle(event);
     }
 
@@ -649,10 +649,6 @@ public class ServiceScheduler extends CompositeService {
     liveInstances.remove(containerId);
   }
 
-  public AsyncDispatcher getCompInstanceDispatcher() {
-    return compInstanceDispatcher;
-  }
-
   public YarnRegistryViewForProviders getYarnRegistryOperations() {
     return yarnRegistryOperations;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 7208f39..88f4763 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -82,7 +82,8 @@ public class Component implements 
EventHandler<ComponentEvent> {
   private Map<String, ComponentInstance> compInstances =
       new ConcurrentHashMap<>();
   // component instances to be assigned with a container
-  private List<ComponentInstance> pendingInstances = new LinkedList<>();
+  private List<ComponentInstance> pendingInstances =
+      Collections.synchronizedList(new LinkedList<>());
   private ContainerFailureTracker failureTracker;
   private Probe probe;
   private final ReentrantReadWriteLock.ReadLock readLock;
@@ -94,7 +95,7 @@ public class Component implements 
EventHandler<ComponentEvent> {
 
   private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
       stateMachine;
-  private AsyncDispatcher compInstanceDispatcher;
+  private AsyncDispatcher dispatcher;
   private static final StateMachineFactory<Component, ComponentState, 
ComponentEventType, ComponentEvent>
       stateMachineFactory =
       new StateMachineFactory<Component, ComponentState, ComponentEventType, 
ComponentEvent>(
@@ -149,7 +150,7 @@ public class Component implements 
EventHandler<ComponentEvent> {
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
     this.stateMachine = stateMachineFactory.make(this);
-    compInstanceDispatcher = scheduler.getCompInstanceDispatcher();
+    dispatcher = scheduler.getDispatcher();
     failureTracker =
         new ContainerFailureTracker(context, this);
     probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck());
@@ -256,30 +257,18 @@ public class Component implements 
EventHandler<ComponentEvent> {
         component.releaseContainer(container);
         return;
       }
-      if (instance.hasContainer()) {
-        LOG.info(
-            "[COMPONENT {}]: Instance {} already has container, release " +
-                "surplus container {}",
-            instance.getCompName(), instance.getCompInstanceId(), container
-                .getId());
-        component.releaseContainer(container);
-        return;
-      }
+
       component.pendingInstances.remove(instance);
-      LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
-              "host {}, num pending component instances reduced to {} ",
-          component.getName(), container.getId(), instance
-              .getCompInstanceName(), container.getNodeId(), component
-              .pendingInstances.size());
       instance.setContainer(container);
       ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
       component.getScheduler().addLiveCompInstance(container.getId(), 
instance);
-      LOG.info("[COMPONENT {}]: Marking {} as started for component " +
-          "instance {}", component.getName(), event.getContainer().getId(),
-          instance.getCompInstanceId());
-      component.compInstanceDispatcher.getEventHandler().handle(
-          new ComponentInstanceEvent(instance.getContainerId(),
-              START));
+      LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
+              "host {}, num pending component instances reduced to {} ",
+          component.getName(), container.getId(),
+          instance.getCompInstanceName(), container.getNodeId(),
+          component.pendingInstances.size());
+      component.dispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(container.getId(), START));
     }
   }
 
@@ -288,9 +277,8 @@ public class Component implements 
EventHandler<ComponentEvent> {
 
     @Override public ComponentState transition(Component component,
         ComponentEvent event) {
-      component.compInstanceDispatcher.getEventHandler().handle(
-          new ComponentInstanceEvent(event.getInstance().getContainerId(),
-              START));
+      component.dispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(event.getContainerId(), START));
       return checkIfStable(component);
     }
   }
@@ -313,14 +301,7 @@ public class Component implements 
EventHandler<ComponentEvent> {
     @Override
     public void transition(Component component, ComponentEvent event) {
       component.updateMetrics(event.getStatus());
-
-      // add back to pending list
-      component.pendingInstances.add(event.getInstance());
-      LOG.info(
-          "[COMPONENT {}]: {} completed, num pending comp instances increased 
to {}.",
-          component.getName(), event.getStatus().getContainerId(),
-          component.pendingInstances.size());
-      component.compInstanceDispatcher.getEventHandler().handle(
+      component.dispatcher.getEventHandler().handle(
           new ComponentInstanceEvent(event.getStatus().getContainerId(),
               STOP).setStatus(event.getStatus()));
       component.componentSpec.setState(
@@ -328,8 +309,8 @@ public class Component implements 
EventHandler<ComponentEvent> {
     }
   }
 
-  public ServiceMetrics getCompMetrics () {
-    return componentMetrics;
+  public void reInsertPendingInstance(ComponentInstance instance) {
+    pendingInstances.add(instance);
   }
 
   private void releaseContainer(Container container) {
@@ -581,4 +562,9 @@ public class Component implements 
EventHandler<ComponentEvent> {
   public ServiceContext getContext() {
     return context;
   }
+
+  // Only for testing
+  public List<ComponentInstance> getPendingInstances() {
+    return pendingInstances;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index d93dcf1..447b436 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.service.component;
 
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
@@ -30,6 +31,16 @@ public class ComponentEvent extends 
AbstractEvent<ComponentEventType> {
   private Container container;
   private ComponentInstance instance;
   private ContainerStatus status;
+  private ContainerId containerId;
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public ComponentEvent setContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+    return this;
+  }
 
   public ComponentEvent(String name, ComponentEventType type) {
     super(type);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 9e5f98c..509f667 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -146,7 +146,7 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
       compInstance.containerStatusFuture =
           compInstance.scheduler.executorService.scheduleAtFixedRate(
               new ContainerStatusRetriever(compInstance.scheduler,
-                  compInstance.getContainerId(), compInstance), 0, 1,
+                  event.getContainerId(), compInstance), 0, 1,
               TimeUnit.SECONDS);
       compInstance.component.incRunningContainers();
       long containerStartTime = System.currentTimeMillis();
@@ -160,10 +160,10 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
       }
       org.apache.hadoop.yarn.service.api.records.Container container =
           new org.apache.hadoop.yarn.service.api.records.Container();
-      container.setId(compInstance.getContainerId().toString());
+      container.setId(event.getContainerId().toString());
       container.setLaunchTime(new Date(containerStartTime));
       container.setState(ContainerState.RUNNING_BUT_UNREADY);
-      container.setBareHost(compInstance.container.getNodeId().getHost());
+      container.setBareHost(compInstance.getNodeId().getHost());
       container.setComponentInstanceName(compInstance.getCompInstanceName());
       if (compInstance.containerSpec != null) {
         // remove the previous container.
@@ -219,15 +219,11 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
       // re-ask the failed container.
       Component comp = compInstance.component;
       comp.requestContainers(1);
-      LOG.info(compInstance.getCompInstanceId()
-              + ": Container completed. Requested a new container." + System
-              .lineSeparator() + " exitStatus={}, diagnostics={}.",
-          event.getStatus().getExitStatus(),
-          event.getStatus().getDiagnostics());
       String containerDiag =
           compInstance.getCompInstanceId() + ": " + event.getStatus()
               .getDiagnostics();
       compInstance.diagnostics.append(containerDiag + System.lineSeparator());
+      compInstance.cancelContainerStatusRetriever();
 
       if (compInstance.getState().equals(READY)) {
         compInstance.component.decContainersReady();
@@ -255,11 +251,13 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
         // hdfs dir content will be overwritten when a new container gets 
started,
         // so no need remove.
         compInstance.scheduler.executorService
-            .submit(compInstance::cleanupRegistry);
+            .submit(() -> 
compInstance.cleanupRegistry(event.getContainerId()));
+
         if (compInstance.timelineServiceEnabled) {
           // record in ATS
-          compInstance.serviceTimelinePublisher.componentInstanceFinished
-              (compInstance, event.getStatus().getExitStatus(), containerDiag);
+          compInstance.serviceTimelinePublisher
+              .componentInstanceFinished(event.getContainerId(),
+                  event.getStatus().getExitStatus(), containerDiag);
         }
         compInstance.containerSpec.setState(ContainerState.STOPPED);
       }
@@ -267,6 +265,14 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
       // remove the failed ContainerId -> CompInstance mapping
       comp.getScheduler().removeLiveCompInstance(event.getContainerId());
 
+      comp.reInsertPendingInstance(compInstance);
+
+      LOG.info(compInstance.getCompInstanceId()
+              + ": {} completed. Reinsert back to pending list and requested " 
+
+              "a new container." + System.lineSeparator() +
+              " exitStatus={}, diagnostics={}.",
+          event.getContainerId(), event.getStatus().getExitStatus(),
+          event.getStatus().getDiagnostics());
       if (shouldExit) {
         // Sleep for 5 seconds in hope that the state can be recorded in ATS.
         // in case there's a client polling the comp state, it can be notified.
@@ -277,8 +283,6 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
         }
         ExitUtil.terminate(-1);
       }
-
-      compInstance.removeContainer();
     }
   }
 
@@ -312,15 +316,6 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
     }
   }
 
-  public boolean hasContainer() {
-    return this.container != null;
-  }
-
-  public void removeContainer() {
-    this.container = null;
-    this.compInstanceId.setContainerId(null);
-  }
-
   public void setContainer(Container container) {
     this.container = container;
     this.compInstanceId.setContainerId(container.getId());
@@ -337,7 +332,7 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
   public void updateContainerStatus(ContainerStatus status) {
     this.status = status;
     org.apache.hadoop.yarn.service.api.records.Container container =
-        getCompSpec().getContainer(getContainerId().toString());
+        getCompSpec().getContainer(status.getContainerId().toString());
     if (container != null) {
       container.setIp(StringUtils.join(",", status.getIPs()));
       container.setHostname(status.getHost());
@@ -348,10 +343,6 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
     updateServiceRecord(yarnRegistryOperations, status);
   }
 
-  public ContainerId getContainerId() {
-    return container.getId();
-  }
-
   public String getCompName() {
     return compInstanceId.getCompName();
   }
@@ -423,12 +414,7 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
   public void destroy() {
     LOG.info(getCompInstanceId() + ": Flexed down by user, destroying.");
     diagnostics.append(getCompInstanceId() + ": Flexed down by user");
-    if (container != null) {
-      scheduler.removeLiveCompInstance(container.getId());
-      component.getScheduler().getAmRMClient()
-          .releaseAssignedContainer(container.getId());
-      getCompSpec().removeContainer(containerSpec);
-    }
+
     // update metrics
     if (getState() == STARTED) {
       component.decRunningContainers();
@@ -437,16 +423,29 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
       component.decContainersReady();
       component.decRunningContainers();
     }
+    getCompSpec().removeContainer(containerSpec);
+
+    if (container == null) {
+      LOG.info(getCompInstanceId() + " no container is assigned when " +
+          "destroying");
+      return;
+    }
+
+    ContainerId containerId = container.getId();
+    scheduler.removeLiveCompInstance(containerId);
+    component.getScheduler().getAmRMClient()
+        .releaseAssignedContainer(containerId);
 
     if (timelineServiceEnabled) {
-      serviceTimelinePublisher.componentInstanceFinished(this,
+      serviceTimelinePublisher.componentInstanceFinished(containerId,
           KILLED_BY_APPMASTER, diagnostics.toString());
     }
-    scheduler.executorService.submit(this::cleanupRegistryAndCompHdfsDir);
+    cancelContainerStatusRetriever();
+    scheduler.executorService.submit(() ->
+        cleanupRegistryAndCompHdfsDir(containerId));
   }
 
-  private void cleanupRegistry() {
-    ContainerId containerId = getContainerId();
+  private void cleanupRegistry(ContainerId containerId) {
     String cid = RegistryPathUtils.encodeYarnID(containerId.toString());
     try {
        yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid);
@@ -456,8 +455,8 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
   }
 
   //TODO Maybe have a dedicated cleanup service.
-  public void cleanupRegistryAndCompHdfsDir() {
-    cleanupRegistry();
+  public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) {
+    cleanupRegistry(containerId);
     try {
       if (compInstanceDir != null && fs.exists(compInstanceDir)) {
         boolean deleted = fs.delete(compInstanceDir, true);
@@ -515,6 +514,12 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  private void cancelContainerStatusRetriever() {
+    if (containerStatusFuture != null && !containerStatusFuture.isDone()) {
+      containerStatusFuture.cancel(true);
+    }
+  }
+
   @Override
   public int compareTo(ComponentInstance to) {
     long delta = containerStartedTime - to.containerStartedTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index 0e51a62..b9f3a24 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -87,7 +87,7 @@ public class ContainerLaunchService extends AbstractService{
       AbstractLauncher launcher = new AbstractLauncher(fs, null);
       try {
         provider.buildContainerLaunchContext(launcher, service,
-            instance, fs, getConfig());
+            instance, fs, getConfig(), container);
         instance.getComponent().getScheduler().getNmClient()
             .startContainerAsync(container,
                 launcher.completeContainerLaunch());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
index 6d74061..7015591 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.provider;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.api.records.Component;
@@ -55,7 +56,7 @@ public abstract class AbstractProviderService implements 
ProviderService,
 
   public void buildContainerLaunchContext(AbstractLauncher launcher,
       Service service, ComponentInstance instance,
-      SliderFileSystem fileSystem, Configuration yarnConf)
+      SliderFileSystem fileSystem, Configuration yarnConf, Container container)
       throws IOException, SliderException {
     Component component = instance.getComponent().getComponentSpec();;
     processArtifact(launcher, instance, fileSystem, service);
@@ -67,7 +68,7 @@ public abstract class AbstractProviderService implements 
ProviderService,
     Map<String, String> globalTokens =
         instance.getComponent().getScheduler().globalTokens;
     Map<String, String> tokensForSubstitution = ProviderUtils
-        .initCompTokensForSubstitute(instance);
+        .initCompTokensForSubstitute(instance, container);
     tokensForSubstitution.putAll(globalTokens);
     // Set the environment variables in launcher
     launcher.putEnv(ServiceUtils

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
index eb721b4..11015ea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.service.provider;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
@@ -34,6 +35,6 @@ public interface ProviderService {
    */
   void buildContainerLaunchContext(AbstractLauncher containerLauncher,
       Service service, ComponentInstance instance,
-      SliderFileSystem sliderFileSystem, Configuration yarnConf)
-      throws IOException, SliderException;
+      SliderFileSystem sliderFileSystem, Configuration yarnConf, Container
+      container) throws IOException, SliderException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index e074dd7..c0c44c3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.service.ServiceContext;
@@ -393,13 +394,13 @@ public class ProviderUtils implements 
YarnServiceConstants {
    * @return tokens to replace
    */
   public static Map<String, String> initCompTokensForSubstitute(
-      ComponentInstance instance) {
+      ComponentInstance instance, Container container) {
     Map<String, String> tokens = new HashMap<>();
     tokens.put(COMPONENT_NAME, instance.getCompSpec().getName());
     tokens
         .put(COMPONENT_NAME_LC, 
instance.getCompSpec().getName().toLowerCase());
     tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
-    tokens.put(CONTAINER_ID, instance.getContainer().getId().toString());
+    tokens.put(CONTAINER_ID, container.getId().toString());
     tokens.put(COMPONENT_ID,
         String.valueOf(instance.getCompInstanceId().getId()));
     tokens.putAll(instance.getComponent().getDependencyHostIpTokens());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
index c522986..949ce19 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.timelineservice;
 
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -178,10 +179,10 @@ public class ServiceTimelinePublisher extends 
CompositeService {
     putEntity(entity);
   }
 
-  public void componentInstanceFinished(ComponentInstance instance,
+  public void componentInstanceFinished(ContainerId containerId,
       int exitCode, String diagnostics) {
     TimelineEntity entity = createComponentInstanceEntity(
-        instance.getContainer().getId().toString());
+        containerId.toString());
 
     // create info keys
     Map<String, Object> entityInfos = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
index d343a03..4298161 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
@@ -24,14 +24,8 @@ import 
org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
+
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -42,15 +36,15 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentState;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
 import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
 import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
+import org.apache.hadoop.yarn.util.Records;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Mockito.mock;
@@ -63,6 +57,8 @@ public class MockServiceAM extends ServiceMaster {
   final List<Container> feedContainers =
       Collections.synchronizedList(new LinkedList<>());
 
+  final List<ContainerStatus> failedContainers =
+      Collections.synchronizedList(new LinkedList<>());
   public MockServiceAM(Service service) {
     super(service.getName());
     this.service = service;
@@ -102,10 +98,10 @@ public class MockServiceAM extends ServiceMaster {
 
             AllocateResponse.AllocateResponseBuilder builder =
                 AllocateResponse.newBuilder();
+            // add new containers if any
             synchronized (feedContainers) {
               if (feedContainers.isEmpty()) {
                 System.out.println("Allocating........ no containers");
-                return builder.build();
               } else {
                 // The AMRMClient will return containers for compoenent that 
are
                 // at FLEXING state
@@ -121,9 +117,20 @@ public class MockServiceAM extends ServiceMaster {
                     itor.remove();
                   }
                 }
-                return 
builder.allocatedContainers(allocatedContainers).build();
+                builder.allocatedContainers(allocatedContainers);
+              }
+            }
+
+            // add failed containers if any
+            synchronized (failedContainers) {
+              if (!failedContainers.isEmpty()) {
+                List<ContainerStatus> failed =
+                    new LinkedList<>(failedContainers);
+                failedContainers.clear();
+                builder.completedContainersStatuses(failed);
               }
             }
+            return builder.build();
           }
 
           @Override
@@ -184,6 +191,19 @@ public class MockServiceAM extends ServiceMaster {
     return container;
   }
 
+  public void feedFailedContainerToComp(Service service, int id, String
+      compName) {
+    ApplicationId applicationId = ApplicationId.fromString(service.getId());
+    ContainerId containerId = ContainerId
+        .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), 
id);
+    ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+    containerStatus.setContainerId(containerId);
+    synchronized (failedContainers) {
+      failedContainers.add(containerStatus);
+    }
+  }
+
+
   public void flexComponent(String compName, long numberOfContainers)
       throws IOException {
     ClientAMProtocol.ComponentCountProto componentCountProto =
@@ -218,4 +238,22 @@ public class MockServiceAM extends ServiceMaster {
       }
     }, 1000, 20000);
   }
+
+
+  public ComponentInstance getCompInstance(String compName, String
+      instanceName) {
+    return context.scheduler.getAllComponents().get(compName)
+        .getComponentInstance(instanceName);
+  }
+
+  public void waitForCompInstanceState(ComponentInstance instance,
+      ComponentInstanceState state)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return instance.getState().equals(state);
+      }
+    }, 1000, 20000);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index cf32880..a70a0c2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -65,6 +65,7 @@ public class ServiceTestUtils {
 
   private MiniYARNCluster yarnCluster = null;
   private MiniDFSCluster hdfsCluster = null;
+  TestingCluster zkCluster;
   private FileSystem fs = null;
   private Configuration conf = null;
   public static final int NUM_NMS = 1;
@@ -165,7 +166,6 @@ public class ServiceTestUtils {
     conf.setBoolean(NM_VMEM_CHECK_ENABLED, false);
     conf.setBoolean(NM_PMEM_CHECK_ENABLED, false);
     // setup zk cluster
-    TestingCluster zkCluster;
     zkCluster = new TestingCluster(1);
     zkCluster.start();
     conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
@@ -239,6 +239,9 @@ public class ServiceTestUtils {
         hdfsCluster = null;
       }
     }
+    if (zkCluster != null) {
+      zkCluster.stop();
+    }
     if (basedir != null) {
       FileUtils.deleteDirectory(basedir);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
new file mode 100644
index 0000000..fb4de0d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.registry.client.api.RegistryConstants
+    .KEY_REGISTRY_ZK_QUORUM;
+
+public class TestServiceAM extends ServiceTestUtils{
+
+  private File basedir;
+  YarnConfiguration conf = new YarnConfiguration();
+  TestingCluster zkCluster;
+
+  @Before
+  public void setup() throws Exception {
+    basedir = new File("target", "apps");
+    if (basedir.exists()) {
+      FileUtils.deleteDirectory(basedir);
+    } else {
+      basedir.mkdirs();
+    }
+    zkCluster = new TestingCluster(1);
+    zkCluster.start();
+    conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
+    System.out.println("ZK cluster: " +  zkCluster.getConnectString());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (basedir != null) {
+      FileUtils.deleteDirectory(basedir);
+    }
+    if (zkCluster != null) {
+      zkCluster.stop();
+    }
+  }
+
+  // Race condition YARN-7486
+  // 1. Allocate 1 container to compa and wait it to be started
+  // 2. Fail this container, and in the meanwhile allocate the 2nd container.
+  // 3. The 2nd container should not be assigned to compa-0 instance, because
+  //   the compa-0 instance is not stopped yet.
+  // 4. check compa still has the instance in the pending list.
+  @Test
+  public void testContainerCompleted() throws TimeoutException,
+      InterruptedException {
+    ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
+    Service exampleApp = new Service();
+    exampleApp.setId(applicationId.toString());
+    exampleApp.setName("testContainerCompleted");
+    exampleApp.addComponent(createComponent("compa", 1, "pwd"));
+
+    MockServiceAM am = new MockServiceAM(exampleApp);
+    am.init(conf);
+    am.start();
+
+    ComponentInstance compa0 = am.getCompInstance("compa", "compa-0");
+    // allocate a container
+    am.feedContainerToComp(exampleApp, 1, "compa");
+    am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED);
+
+    System.out.println("Fail the container 1");
+    // fail the container
+    am.feedFailedContainerToComp(exampleApp, 1, "compa");
+
+    // allocate the second container immediately, this container will not be
+    // assigned to comp instance
+    // because the instance is not yet added to the pending list.
+    am.feedContainerToComp(exampleApp, 2, "compa");
+
+    am.waitForCompInstanceState(compa0, ComponentInstanceState.INIT);
+    // still 1 pending instance
+    Assert.assertEquals(1,
+        am.getComponent("compa").getPendingInstances().size());
+    am.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d5d202/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java
index 0e03a2c..e25d38d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.yarn.service.monitor;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.MockServiceAM;
@@ -37,10 +38,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 
+import static org.apache.hadoop.registry.client.api.RegistryConstants
+    .KEY_REGISTRY_ZK_QUORUM;
+
 public class TestServiceMonitor extends ServiceTestUtils {
 
   private File basedir;
   YarnConfiguration conf = new YarnConfiguration();
+  TestingCluster zkCluster;
 
   @Before
   public void setup() throws Exception {
@@ -51,6 +56,10 @@ public class TestServiceMonitor extends ServiceTestUtils {
       basedir.mkdirs();
     }
     conf.setLong(YarnServiceConf.READINESS_CHECK_INTERVAL, 2);
+    zkCluster = new TestingCluster(1);
+    zkCluster.start();
+    conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
+    System.out.println("ZK cluster: " +  zkCluster.getConnectString());
   }
 
   @After
@@ -58,6 +67,9 @@ public class TestServiceMonitor extends ServiceTestUtils {
     if (basedir != null) {
       FileUtils.deleteDirectory(basedir);
     }
+    if (zkCluster != null) {
+      zkCluster.stop();
+    }
   }
 
   // Create compa with 1 container


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to