[ 
https://issues.apache.org/jira/browse/YARN-11509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733145#comment-17733145
 ] 

ASF GitHub Bot commented on YARN-11509:
---------------------------------------

goiri commented on code in PR #5727:
URL: https://github.com/apache/hadoop/pull/5727#discussion_r1231220115


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java:
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.security.token.Token;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+public class TokenAndRegisterResponse {

Review Comment:
   Add javadoc.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -1251,90 +1252,109 @@ private List<SubClusterId> 
registerAndAllocateWithNewSubClusters(
     // Check to see if there are any new sub-clusters in this request
     // list and create and register Unmanaged AM instance for the new ones
     List<SubClusterId> newSubClusters = new ArrayList<>();
-    for (SubClusterId subClusterId : requests.keySet()) {
-      if (!subClusterId.equals(this.homeSubClusterId)
-          && !this.uamPool.hasUAMId(subClusterId.getId())) {
-        newSubClusters.add(subClusterId);
 
+    requests.keySet().stream().forEach(subClusterId -> {
+      String id = subClusterId.getId();
+      if (!subClusterId.equals(this.homeSubClusterId) && 
!this.uamPool.hasUAMId(id)) {
+        newSubClusters.add(subClusterId);
         // Set sub-cluster to be timed out initially
-        lastSCResponseTime.put(subClusterId,
-            clock.getTime() - subClusterTimeOut);
+        lastSCResponseTime.put(subClusterId, clock.getTime() - 
subClusterTimeOut);
       }
-    }
+    });
 
     this.uamRegisterFutures.clear();
+
     for (final SubClusterId scId : newSubClusters) {
-      Future<?> future = this.threadpool.submit(new Runnable() {
-        @Override
-        public void run() {
-          String subClusterId = scId.getId();
-
-          // Create a config loaded with federation on and subclusterId
-          // for each UAM
-          YarnConfiguration config = new YarnConfiguration(getConf());
-          FederationProxyProviderUtil.updateConfForFederation(config,
-              subClusterId);
-
-          RegisterApplicationMasterResponse uamResponse = null;
-          Token<AMRMTokenIdentifier> token = null;
-          try {
-            ApplicationId applicationId = attemptId.getApplicationId();
-            ApplicationSubmissionContext originalSubmissionContext =
-                
federationFacade.getApplicationSubmissionContext(applicationId);
-
-            // For appNameSuffix, use subClusterId of the home sub-cluster
-            token = uamPool.launchUAM(subClusterId, config,
-                applicationId, amRegistrationResponse.getQueue(),
-                getApplicationContext().getUser(), homeSubClusterId.toString(),
-                true, subClusterId, originalSubmissionContext);
-
-            secondaryRelayers.put(subClusterId,
-                uamPool.getAMRMClientRelayer(subClusterId));
-
-            uamResponse = uamPool.registerApplicationMaster(subClusterId,
-                amRegistrationRequest);
-          } catch (Throwable e) {
-            LOG.error("Failed to register application master: " + subClusterId
-                + " Application: " + attemptId, e);
-            // TODO: UAM registration for this sub-cluster RM
-            // failed. For now, we ignore the resource requests and continue
-            // but we need to fix this and handle this situation. One way would
-            // be to send the request to another RM by consulting the policy.
-            return;
-          }
-          uamRegistrations.put(scId, uamResponse);
-          LOG.info("Successfully registered unmanaged application master: "
-              + subClusterId + " ApplicationId: " + attemptId);
 
-          try {
-            uamPool.allocateAsync(subClusterId, requests.get(scId),
-                new HeartbeatCallBack(scId, true));
-          } catch (Throwable e) {
-            LOG.error("Failed to allocate async to " + subClusterId
-                + " Application: " + attemptId, e);
-          }
+      Future<?> future = this.threadpool.submit(() -> {
 
-          // Save the UAM token in registry or NMSS
-          try {
-            if (registryClient != null) {
-              registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
-                  subClusterId, token);
-            } else if (getNMStateStore() != null) {
-              getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
-                  NMSS_SECONDARY_SC_PREFIX + subClusterId,
-                  token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
-            }
-          } catch (Throwable e) {
-            LOG.error("Failed to persist UAM token from " + subClusterId
-                + " Application: " + attemptId, e);
+        String subClusterId = scId.getId();
+
+        // Create a config loaded with federation on and subclusterId
+        // for each UAM
+        YarnConfiguration config = new YarnConfiguration(getConf());
+        FederationProxyProviderUtil.updateConfForFederation(config, 
subClusterId);
+        ApplicationId applicationId = attemptId.getApplicationId();
+
+        RegisterApplicationMasterResponse uamResponse;
+        Token<AMRMTokenIdentifier> token;
+
+        // LaunchUAM And RegisterApplicationMaster
+        try {
+

Review Comment:
   Avoid this break line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1432,4 +1432,53 @@ private void finishApplication() throws IOException, 
YarnException {
     Assert.assertNotNull(finishResponse);
     Assert.assertTrue(finishResponse.getIsUnregistered());
   }
+
+  @Test
+  public void testLaunchUAMAndRegisterApplicationMasterRetry() throws 
Exception {
+
+    UserGroupInformation ugi = 
interceptor.getUGIWithToken(interceptor.getAttemptId());
+    interceptor.setRetryCount(2);
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+      // Register the application
+      RegisterApplicationMasterRequest registerReq =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq.setHost(Integer.toString(testAppId));
+      registerReq.setRpcPort(0);
+      registerReq.setTrackingUrl("");
+
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 active
+      registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+      int numberOfContainers = 3;
+      List<Container> containers = getContainersAndAssert(numberOfContainers, 
numberOfContainers);
+      Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+      // Release all containers
+      releaseContainersAndAssert(containers);
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+      FinishApplicationMasterResponse finishResponse =
+          interceptor.finishApplicationMaster(finishReq);
+      Assert.assertNotNull(finishResponse);
+      Assert.assertEquals(true, finishResponse.getIsUnregistered());

Review Comment:
   assertTrue





> The FederationInterceptor#launchUAM Added retry logic.
> ------------------------------------------------------
>
>                 Key: YARN-11509
>                 URL: https://issues.apache.org/jira/browse/YARN-11509
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: amrmproxy
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Minor
>              Labels: pull-request-available
>
> There is a "todo" in the 
> FederationInterceptor#registerAndAllocateWithNewSubClusters method. According 
> to the "todo" description, the request needs to be retried to other 
> subclusters, but changing the parameter requests  in 
> registerAndAllocateWithNewSubClusters is not a good operation. It is better 
> to add retry logic here. 
> We don't need to worry about losing requests because when the request cannot 
> be satisfied, the AM of the task will continue to apply, and these requests 
> will be properly transferred to other clusters for execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to