[ 
https://issues.apache.org/jira/browse/YARN-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760904#comment-17760904
 ] 

ASF GitHub Bot commented on YARN-8980:
--------------------------------------

slfan1989 commented on code in PR #5975:
URL: https://github.com/apache/hadoop/pull/5975#discussion_r1311548384


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java:
##########
@@ -142,14 +156,124 @@ protected void testUAMRestart(boolean keepContainers) 
throws Exception {
     numContainers = 1;
     am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
     nm.nodeHeartbeat(true);
-    conts = am.allocate(new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>()).getAllocatedContainers();
+    allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    conts = allocateResponse.getAllocatedContainers();
     while (conts.size() < numContainers) {
       nm.nodeHeartbeat(true);
-      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers());
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
       Thread.sleep(100);
     }
+    checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+    rm.stop();
+  }
+
+  protected void testUAMRestartWithoutTransferContainer(boolean 
keepContainers) throws Exception {
+    // start RM
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNM nm =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm.registerNode();
+    Set<NodeId> tokenCacheClientSide = new HashSet();
+
+    // create app and launch the UAM
+    boolean unamanged = true;
+    int maxAttempts = 1;
+    boolean waitForAccepted = true;
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
+            .withAppName("")
+            .withUser(UserGroupInformation.getCurrentUser().getShortUserName())
+            .withAcls(null)
+            .withUnmanagedAM(unamanged)
+            .withQueue(null)
+            .withMaxAppAttempts(maxAttempts)
+            .withCredentials(null)
+            .withAppType(null)
+            .withWaitForAppAcceptedState(waitForAccepted)
+            .withKeepContainers(keepContainers)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+
+    MockAM am = MockRM.launchUAM(app, rm, nm);
+
+    // Register for the first time
+    am.registerAppAttempt();
+
+    // Allocate two containers to UAM
+    int numContainers = 3;
+    AllocateResponse allocateResponse =
+        am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    List<Container> conts = allocateResponse.getAllocatedContainers();
+    while (conts.size() < numContainers) {
+      nm.nodeHeartbeat(true);
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
+      Thread.sleep(100);
+    }
+    checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+    // Release all containers, then there are no transfer containfer app 
attempt
+    List<ContainerId> releaseList = new ArrayList();
+    releaseList.add(conts.get(0).getId());
+    releaseList.add(conts.get(1).getId());
+    releaseList.add(conts.get(2).getId());
+    List<ContainerStatus> finishedConts =
+        am.allocate(new ArrayList<ResourceRequest>(), releaseList)
+            .getCompletedContainersStatuses();
+    while (finishedConts.size() < releaseList.size()) {
+      nm.nodeHeartbeat(true);
+      finishedConts
+          .addAll(am
+              .allocate(new ArrayList<ResourceRequest>(),
+                  new ArrayList<ContainerId>())
+              .getCompletedContainersStatuses());
+      Thread.sleep(100);
+    }
+
+    // Register for the second time
+    RegisterApplicationMasterResponse response = null;
+    try {
+      response = am.registerAppAttempt(false);
+      // When AM restart, it means nmToken in client side should be missing
+      tokenCacheClientSide.clear();
+      response.getNMTokensFromPreviousAttempts()
+          .forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
+    } catch (InvalidApplicationMasterRequestException e) {
+      Assert.assertEquals(false, keepContainers);
+      return;
+    }
+    Assert.assertEquals("RM should not allow second register"
+        + " for UAM without keep container flag ", true, keepContainers);
+
+    // Expecting the zero running containers previously
+    Assert.assertEquals(0, 
response.getContainersFromPreviousAttempts().size());
+    Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size());
+
+    // Allocate one more containers to UAM, just to be safe
+    numContainers = 1;
+    am.allocate("127.0.0.1", 1000, numContainers, new 
ArrayList<ContainerId>());
+    nm.nodeHeartbeat(true);
+    allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+    allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+    conts = allocateResponse.getAllocatedContainers();
+    while (conts.size() < numContainers) {
+      nm.nodeHeartbeat(true);
+      allocateResponse =
+          am.allocate(new ArrayList<ResourceRequest>(), new 
ArrayList<ContainerId>());
+      allocateResponse.getNMTokens().forEach(token -> 
tokenCacheClientSide.add(token.getNodeId()));
+      conts.addAll(allocateResponse.getAllocatedContainers());
+      Thread.sleep(100);
+    }

Review Comment:
   Should mockNM and MockAM be closed?





> Mapreduce application container start  fail after AM restart.
> -------------------------------------------------------------
>
>                 Key: YARN-8980
>                 URL: https://issues.apache.org/jira/browse/YARN-8980
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>            Reporter: Bibin Chundatt
>            Assignee: zhengchenyu
>            Priority: Major
>              Labels: pull-request-available
>
> UAM to subclusters are always launched with keepContainers.
> On AM restart scenarios , UAM register again with RM . UAM receive running 
> containers with NMToken. NMToken received by UAM in 
> getPreviousAttemptContainersNMToken is never used by mapreduce application.  
> Federation Interceptor should take care of such scenarios too. Merge NMToken 
> received at registration to allocate response.
> Container allocation response on same node will have NMToken empty.
> issue credits : [~Nallasivan]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to