[
https://issues.apache.org/jira/browse/YARN-11306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607307#comment-17607307
]
ASF GitHub Bot commented on YARN-11306:
---------------------------------------
goiri commented on code in PR #4897:
URL: https://github.com/apache/hadoop/pull/4897#discussion_r975603420
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,181 @@ public void testRemoveAppFromRegistryApplicationFailed()
return null;
});
}
+
+ public void testRecoverWithBadSubCluster(final RegistryOperations
registryObj)
+ throws IOException, InterruptedException {
+
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ // Prepare a list of subclusters
+ List<SubClusterId> subClusterIds = new ArrayList<>();
+ SubClusterId sc1 = SubClusterId.newInstance("SC-1");
+ SubClusterId sc2 = SubClusterId.newInstance("SC-2");
+ SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID);
+ subClusterIds.add(sc1);
+ subClusterIds.add(sc2);
+ subClusterIds.add(homeSC);
+
+ // Prepare AMRMProxy Context
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+
+ // Prepare RegisterApplicationMasterRequest
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor
+ initSubClusterAndInterceptor(subClusterIds, registryObj);
+
+ // Step2. Register Application And Assign Containers
+ List<Container> containers =
registerApplicationAndAssignContainers(registerReq);
+
+ // Step3. Offline SC-1 cluster
+ offlineSubClusterSC1(sc1);
+
+ // Step4. Recover ApplicationMaster
+ recoverApplicationMaster(appContext);
+
+ // Step5. We recovered ApplicationMaster.
+ // SC-1 was offline, SC-2 was recovered at this time,
UnmanagedAMPool.size=1 and only SC-2
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 1);
+ Assert.assertTrue(allUAMIds.contains(sc2.getId()));
+
+ // Step6. The first allocate call expects a fail-over exception and
re-register.
+ AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(0);
+ LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class,
+ "AMRMProxy just restarted and recovered for " + this.attemptId +
+ ". AM should re-register and full re-send pending requests.",
+ () -> interceptor.allocate(allocateRequest));
+ interceptor.registerApplicationMaster(registerReq);
+
+ // Step7. release Containers
+ releaseContainers(containers, sc1);
+
+ // Step8. finish application
+ finishApplication();
+
+ return null;
+ });
+ }
+
+ private void initSubClusterAndInterceptor(List<SubClusterId> subClusterIds,
+ RegistryOperations registryObj) throws YarnException {
+ // Prepare subClusters SC-1, SC-2, HomeSC
+ for (SubClusterId subClusterId : subClusterIds) {
+ registerSubCluster(subClusterId);
+ }
+
+ // Prepare Interceptor
+ interceptor = new TestableFederationInterceptor();
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+ interceptor.init(appContext);
+ interceptor.cleanupRegistry();
+ }
+
+ private List<Container> registerApplicationAndAssignContainers(
+ RegisterApplicationMasterRequest registerReq) throws Exception {
+
+ // Register HomeSC
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ // We only registered HomeSC, so UnmanagedAMPoolSize should be empty
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // We assign 3 Containers to each cluster
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 3);
+
+ // At this point, UnmanagedAMPoolSize should be equal to 2 and should
contain SC-1, SC-2
+ Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 2);
+ Assert.assertTrue(allUAMIds.contains("SC-1"));
+ Assert.assertTrue(allUAMIds.contains("SC-2"));
+
+ // Make sure all async hb threads are done
+ interceptor.drainAllAsyncQueue(true);
+
+ return containers;
+ }
+
+ private void offlineSubClusterSC1(SubClusterId subClusterId) throws
YarnException {
+
+ ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+ interceptor.getSecondaryRMs();
+
+ // SC-1 out of service
+ deRegisterSubCluster(subClusterId);
+ secondaries.get(subClusterId.getId()).setRunningMode(false);
+ }
+
+ private void recoverApplicationMaster(AMRMProxyApplicationContext appContext)
+ throws IOException {
+ // Prepare for Federation Interceptor restart and recover
+ Map<String, byte[]> recoveredDataMap =
+ recoverDataMapForAppAttempt(nmStateStore, attemptId);
+
+ // Preserve the mock RM instances
+ MockResourceManagerFacade homeRM = interceptor.getHomeRM();
+
+ // Create a new interceptor instance and recover
+ interceptor = new TestableFederationInterceptor(homeRM,
+ interceptor.getSecondaryRMs());
+ interceptor.init(appContext);
+ interceptor.recover(recoveredDataMap);
+ }
+
+ private void releaseContainers(List<Container> containers, SubClusterId
subClusterId)
+ throws Exception {
+
+ ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+ interceptor.getSecondaryRMs();
+ lastResponseId = 0;
+
+ // Get the Container list of SC-1
+ MockResourceManagerFacade sc1Facade = secondaries.get("SC-1");
+ HashMap<ApplicationId, List<ContainerId>> appContainerMap =
+ sc1Facade.getApplicationContainerIdMap();
+ Assert.assertNotNull(appContainerMap);
+ ApplicationId applicationId = attemptId.getApplicationId();
+ Assert.assertNotNull(applicationId);
+ List<ContainerId> sc1ContainerList = appContainerMap.get(applicationId);
+
+ // Release all containers,
+ // Because SC-1 is offline, it is necessary to clean up the Containers
allocated by SC-1
+ containers = containers.stream()
+ .filter(container -> !sc1ContainerList.contains(container.getId()))
+ .collect(Collectors.toList());
+ releaseContainersAndAssert(containers);
+ }
+
+ private void finishApplication() throws IOException, YarnException {
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResponse);
+ Assert.assertEquals(true, finishResponse.getIsUnregistered());
Review Comment:
assertTrue
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,181 @@ public void testRemoveAppFromRegistryApplicationFailed()
return null;
});
}
+
+ public void testRecoverWithBadSubCluster(final RegistryOperations
registryObj)
+ throws IOException, InterruptedException {
+
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ // Prepare a list of subclusters
+ List<SubClusterId> subClusterIds = new ArrayList<>();
+ SubClusterId sc1 = SubClusterId.newInstance("SC-1");
+ SubClusterId sc2 = SubClusterId.newInstance("SC-2");
+ SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID);
+ subClusterIds.add(sc1);
+ subClusterIds.add(sc2);
+ subClusterIds.add(homeSC);
+
+ // Prepare AMRMProxy Context
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+
+ // Prepare RegisterApplicationMasterRequest
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor
+ initSubClusterAndInterceptor(subClusterIds, registryObj);
+
+ // Step2. Register Application And Assign Containers
+ List<Container> containers =
registerApplicationAndAssignContainers(registerReq);
+
+ // Step3. Offline SC-1 cluster
+ offlineSubClusterSC1(sc1);
+
+ // Step4. Recover ApplicationMaster
+ recoverApplicationMaster(appContext);
+
+ // Step5. We recovered ApplicationMaster.
+ // SC-1 was offline, SC-2 was recovered at this time,
UnmanagedAMPool.size=1 and only SC-2
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 1);
+ Assert.assertTrue(allUAMIds.contains(sc2.getId()));
+
+ // Step6. The first allocate call expects a fail-over exception and
re-register.
+ AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(0);
+ LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class,
+ "AMRMProxy just restarted and recovered for " + this.attemptId +
+ ". AM should re-register and full re-send pending requests.",
+ () -> interceptor.allocate(allocateRequest));
+ interceptor.registerApplicationMaster(registerReq);
+
+ // Step7. release Containers
+ releaseContainers(containers, sc1);
+
+ // Step8. finish application
+ finishApplication();
+
+ return null;
+ });
+ }
+
+ private void initSubClusterAndInterceptor(List<SubClusterId> subClusterIds,
+ RegistryOperations registryObj) throws YarnException {
+ // Prepare subClusters SC-1, SC-2, HomeSC
+ for (SubClusterId subClusterId : subClusterIds) {
+ registerSubCluster(subClusterId);
+ }
+
+ // Prepare Interceptor
+ interceptor = new TestableFederationInterceptor();
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+ interceptor.init(appContext);
+ interceptor.cleanupRegistry();
+ }
+
+ private List<Container> registerApplicationAndAssignContainers(
+ RegisterApplicationMasterRequest registerReq) throws Exception {
+
+ // Register HomeSC
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ // We only registered HomeSC, so UnmanagedAMPoolSize should be empty
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // We assign 3 Containers to each cluster
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 3);
+
+ // At this point, UnmanagedAMPoolSize should be equal to 2 and should
contain SC-1, SC-2
+ Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 2);
Review Comment:
assertEquals
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,181 @@ public void testRemoveAppFromRegistryApplicationFailed()
return null;
});
}
+
+ public void testRecoverWithBadSubCluster(final RegistryOperations
registryObj)
+ throws IOException, InterruptedException {
+
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ // Prepare a list of subclusters
+ List<SubClusterId> subClusterIds = new ArrayList<>();
+ SubClusterId sc1 = SubClusterId.newInstance("SC-1");
+ SubClusterId sc2 = SubClusterId.newInstance("SC-2");
+ SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID);
+ subClusterIds.add(sc1);
+ subClusterIds.add(sc2);
+ subClusterIds.add(homeSC);
+
+ // Prepare AMRMProxy Context
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+
+ // Prepare RegisterApplicationMasterRequest
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor
+ initSubClusterAndInterceptor(subClusterIds, registryObj);
+
+ // Step2. Register Application And Assign Containers
+ List<Container> containers =
registerApplicationAndAssignContainers(registerReq);
+
+ // Step3. Offline SC-1 cluster
+ offlineSubClusterSC1(sc1);
+
+ // Step4. Recover ApplicationMaster
+ recoverApplicationMaster(appContext);
+
+ // Step5. We recovered ApplicationMaster.
+ // SC-1 was offline, SC-2 was recovered at this time,
UnmanagedAMPool.size=1 and only SC-2
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 1);
Review Comment:
assertEquals
> Refactor NM#FederationInterceptor#recover Code
> ----------------------------------------------
>
> Key: YARN-11306
> URL: https://issues.apache.org/jira/browse/YARN-11306
> Project: Hadoop YARN
> Issue Type: Improvement
> Components: federation, nodemanager
> Affects Versions: 3.4.0
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
> Refactor NM#FederationInterceptor#recover Code
> 1.Enhance code readability
> 2.Add empty check
> 3.When an exception is encountered, completely destroy the data of SubCluster
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]