[
https://issues.apache.org/jira/browse/YARN-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760904#comment-17760904
]
ASF GitHub Bot commented on YARN-8980:
--------------------------------------
slfan1989 commented on code in PR #5975:
URL: https://github.com/apache/hadoop/pull/5975#discussion_r1311548384
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java:
##########
@@ -142,14 +156,124 @@ protected void testUAMRestart(boolean keepContainers)
throws Exception {
numContainers = 1;
am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
- conts = am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
+ allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
- conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers());
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
+ checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+ rm.stop();
+ }
+
+ protected void testUAMRestartWithoutTransferContainer(boolean
keepContainers) throws Exception {
+ // start RM
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNM nm =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm.registerNode();
+ Set<NodeId> tokenCacheClientSide = new HashSet();
+
+ // create app and launch the UAM
+ boolean unamanged = true;
+ int maxAttempts = 1;
+ boolean waitForAccepted = true;
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
+ .withAppName("")
+ .withUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ .withAcls(null)
+ .withUnmanagedAM(unamanged)
+ .withQueue(null)
+ .withMaxAppAttempts(maxAttempts)
+ .withCredentials(null)
+ .withAppType(null)
+ .withWaitForAppAcceptedState(waitForAccepted)
+ .withKeepContainers(keepContainers)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+
+ MockAM am = MockRM.launchUAM(app, rm, nm);
+
+ // Register for the first time
+ am.registerAppAttempt();
+
+ // Allocate two containers to UAM
+ int numContainers = 3;
+ AllocateResponse allocateResponse =
+ am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ List<Container> conts = allocateResponse.getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
+ Thread.sleep(100);
+ }
+ checkNMTokenForContainer(tokenCacheClientSide, conts);
+
+ // Release all containers, then there are no transfer containfer app
attempt
+ List<ContainerId> releaseList = new ArrayList();
+ releaseList.add(conts.get(0).getId());
+ releaseList.add(conts.get(1).getId());
+ releaseList.add(conts.get(2).getId());
+ List<ContainerStatus> finishedConts =
+ am.allocate(new ArrayList<ResourceRequest>(), releaseList)
+ .getCompletedContainersStatuses();
+ while (finishedConts.size() < releaseList.size()) {
+ nm.nodeHeartbeat(true);
+ finishedConts
+ .addAll(am
+ .allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>())
+ .getCompletedContainersStatuses());
+ Thread.sleep(100);
+ }
+
+ // Register for the second time
+ RegisterApplicationMasterResponse response = null;
+ try {
+ response = am.registerAppAttempt(false);
+ // When AM restart, it means nmToken in client side should be missing
+ tokenCacheClientSide.clear();
+ response.getNMTokensFromPreviousAttempts()
+ .forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
+ } catch (InvalidApplicationMasterRequestException e) {
+ Assert.assertEquals(false, keepContainers);
+ return;
+ }
+ Assert.assertEquals("RM should not allow second register"
+ + " for UAM without keep container flag ", true, keepContainers);
+
+ // Expecting the zero running containers previously
+ Assert.assertEquals(0,
response.getContainersFromPreviousAttempts().size());
+ Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size());
+
+ // Allocate one more containers to UAM, just to be safe
+ numContainers = 1;
+ am.allocate("127.0.0.1", 1000, numContainers, new
ArrayList<ContainerId>());
+ nm.nodeHeartbeat(true);
+ allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts = allocateResponse.getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ allocateResponse =
+ am.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>());
+ allocateResponse.getNMTokens().forEach(token ->
tokenCacheClientSide.add(token.getNodeId()));
+ conts.addAll(allocateResponse.getAllocatedContainers());
+ Thread.sleep(100);
+ }
Review Comment:
Should mockNM and MockAM be closed?
> Mapreduce application container start fail after AM restart.
> -------------------------------------------------------------
>
> Key: YARN-8980
> URL: https://issues.apache.org/jira/browse/YARN-8980
> Project: Hadoop YARN
> Issue Type: Sub-task
> Reporter: Bibin Chundatt
> Assignee: zhengchenyu
> Priority: Major
> Labels: pull-request-available
>
> UAM to subclusters are always launched with keepContainers.
> On AM restart scenarios , UAM register again with RM . UAM receive running
> containers with NMToken. NMToken received by UAM in
> getPreviousAttemptContainersNMToken is never used by mapreduce application.
> Federation Interceptor should take care of such scenarios too. Merge NMToken
> received at registration to allocate response.
> Container allocation response on same node will have NMToken empty.
> issue credits : [~Nallasivan]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]