[ 
https://issues.apache.org/jira/browse/YARN-11306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606752#comment-17606752
 ] 

ASF GitHub Bot commented on YARN-11306:
---------------------------------------

goiri commented on code in PR #4897:
URL: https://github.com/apache/hadoop/pull/4897#discussion_r974657324


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -463,42 +461,67 @@ public void recover(Map<String, byte[]> recoveredDataMap) 
{
       // map as well.
       UserGroupInformation appSubmitter;
       if (UserGroupInformation.isSecurityEnabled()) {
-        appSubmitter = 
UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+        appSubmitter = UserGroupInformation.createProxyUser(user,
             UserGroupInformation.getLoginUser());
       } else {
-        appSubmitter = 
UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+        appSubmitter = UserGroupInformation.createRemoteUser(user);
       }
-      ApplicationClientProtocol rmClient =
-          createHomeRMProxy(getApplicationContext(),
-              ApplicationClientProtocol.class, appSubmitter);
 
-      GetContainersResponse response = rmClient
-          .getContainers(GetContainersRequest.newInstance(this.attemptId));
+      ApplicationClientProtocol rmClient = 
createHomeRMProxy(applicationContext,
+          ApplicationClientProtocol.class, appSubmitter);
+
+      GetContainersRequest request = 
GetContainersRequest.newInstance(this.attemptId);
+      GetContainersResponse response = rmClient.getContainers(request);
+
       for (ContainerReport container : response.getContainerList()) {
-        containerIdToSubClusterIdMap.put(container.getContainerId(),
-            this.homeSubClusterId);
+        ContainerId containerId = container.getContainerId();
+        containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
         containers++;
-        LOG.debug("  From home RM {} running container {}",
-            this.homeSubClusterId, container.getContainerId());
+        LOG.debug("From home RM {} running container {}.", 
this.homeSubClusterId, containerId);
       }
-      LOG.info("{} running containers including AM recovered from home RM {}",
+      LOG.info("{} running containers including AM recovered from home RM {}.",
           response.getContainerList().size(), this.homeSubClusterId);
 
-      LOG.info(
-          "In all {} UAMs {} running containers including AM recovered for {}",
+      LOG.info("In all {} UAMs {} running containers including AM recovered 
for {}.",
           uamMap.size(), containers, this.attemptId);
 
-      if (this.amRegistrationResponse != null) {
+      if (queue != null) {
         // Initialize the AMRMProxyPolicy
-        String queue = this.amRegistrationResponse.getQueue();
-        this.policyInterpreter =
-            FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
-                getConf(), this.federationFacade, this.homeSubClusterId);
+        queue = this.amRegistrationResponse.getQueue();
+        this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, 
this.policyInterpreter,
+            getConf(), this.federationFacade, this.homeSubClusterId);
       }
     } catch (IOException | YarnException e) {
       throw new YarnRuntimeException(e);
     }
+  }
 
+  private Map<String, Token<AMRMTokenIdentifier>> getSCAMRMTokenIdentifierMap(
+      Map<String, byte[]> recoveredDataMap) throws IOException {
+    Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+    ApplicationId applicationId = this.attemptId.getApplicationId();
+    if (this.registryClient != null) {
+      uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+      LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
+          uamMap.size(), applicationId);
+    } else {
+      for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {

Review Comment:
   It might be good to have this large chunk in a separate method.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -463,42 +461,67 @@ public void recover(Map<String, byte[]> recoveredDataMap) 
{
       // map as well.
       UserGroupInformation appSubmitter;
       if (UserGroupInformation.isSecurityEnabled()) {
-        appSubmitter = 
UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+        appSubmitter = UserGroupInformation.createProxyUser(user,
             UserGroupInformation.getLoginUser());
       } else {
-        appSubmitter = 
UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+        appSubmitter = UserGroupInformation.createRemoteUser(user);
       }
-      ApplicationClientProtocol rmClient =
-          createHomeRMProxy(getApplicationContext(),
-              ApplicationClientProtocol.class, appSubmitter);
 
-      GetContainersResponse response = rmClient
-          .getContainers(GetContainersRequest.newInstance(this.attemptId));
+      ApplicationClientProtocol rmClient = 
createHomeRMProxy(applicationContext,
+          ApplicationClientProtocol.class, appSubmitter);
+
+      GetContainersRequest request = 
GetContainersRequest.newInstance(this.attemptId);
+      GetContainersResponse response = rmClient.getContainers(request);
+
       for (ContainerReport container : response.getContainerList()) {
-        containerIdToSubClusterIdMap.put(container.getContainerId(),
-            this.homeSubClusterId);
+        ContainerId containerId = container.getContainerId();
+        containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
         containers++;
-        LOG.debug("  From home RM {} running container {}",
-            this.homeSubClusterId, container.getContainerId());
+        LOG.debug("From home RM {} running container {}.", 
this.homeSubClusterId, containerId);
       }
-      LOG.info("{} running containers including AM recovered from home RM {}",
+      LOG.info("{} running containers including AM recovered from home RM {}.",
           response.getContainerList().size(), this.homeSubClusterId);
 
-      LOG.info(
-          "In all {} UAMs {} running containers including AM recovered for {}",
+      LOG.info("In all {} UAMs {} running containers including AM recovered 
for {}.",
           uamMap.size(), containers, this.attemptId);
 
-      if (this.amRegistrationResponse != null) {
+      if (queue != null) {
         // Initialize the AMRMProxyPolicy
-        String queue = this.amRegistrationResponse.getQueue();
-        this.policyInterpreter =
-            FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
-                getConf(), this.federationFacade, this.homeSubClusterId);
+        queue = this.amRegistrationResponse.getQueue();
+        this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, 
this.policyInterpreter,
+            getConf(), this.federationFacade, this.homeSubClusterId);
       }
     } catch (IOException | YarnException e) {
       throw new YarnRuntimeException(e);
     }
+  }
 
+  private Map<String, Token<AMRMTokenIdentifier>> getSCAMRMTokenIdentifierMap(
+      Map<String, byte[]> recoveredDataMap) throws IOException {
+    Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+    ApplicationId applicationId = this.attemptId.getApplicationId();
+    if (this.registryClient != null) {
+      uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+      LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",

Review Comment:
   YARN registry.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,130 @@ public void testRemoveAppFromRegistryApplicationFailed()
       return null;
     });
   }
+
+  public void testRecoverWithBadSubCluster(final RegistryOperations 
registryObj) throws IOException, InterruptedException {
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {

Review Comment:
   To have such a long function, does it make sense to define the function as a 
method itself and call it instead a huge nested lambda?





> Refactor NM#FederationInterceptor#recover Code
> ----------------------------------------------
>
>                 Key: YARN-11306
>                 URL: https://issues.apache.org/jira/browse/YARN-11306
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: federation, nodemanager
>    Affects Versions: 3.4.0
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>
> Refactor NM#FederationInterceptor#recover Code
> 1.Enhance code readability
> 2.Add empty check
> 3.When an exception is encountered, completely destroy the data of SubCluster



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to