[
https://issues.apache.org/jira/browse/YARN-11509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733145#comment-17733145
]
ASF GitHub Bot commented on YARN-11509:
---------------------------------------
goiri commented on code in PR #5727:
URL: https://github.com/apache/hadoop/pull/5727#discussion_r1231220115
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java:
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.security.token.Token;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+public class TokenAndRegisterResponse {
Review Comment:
Add javadoc.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -1251,90 +1252,109 @@ private List<SubClusterId>
registerAndAllocateWithNewSubClusters(
// Check to see if there are any new sub-clusters in this request
// list and create and register Unmanaged AM instance for the new ones
List<SubClusterId> newSubClusters = new ArrayList<>();
- for (SubClusterId subClusterId : requests.keySet()) {
- if (!subClusterId.equals(this.homeSubClusterId)
- && !this.uamPool.hasUAMId(subClusterId.getId())) {
- newSubClusters.add(subClusterId);
+ requests.keySet().stream().forEach(subClusterId -> {
+ String id = subClusterId.getId();
+ if (!subClusterId.equals(this.homeSubClusterId) &&
!this.uamPool.hasUAMId(id)) {
+ newSubClusters.add(subClusterId);
// Set sub-cluster to be timed out initially
- lastSCResponseTime.put(subClusterId,
- clock.getTime() - subClusterTimeOut);
+ lastSCResponseTime.put(subClusterId, clock.getTime() -
subClusterTimeOut);
}
- }
+ });
this.uamRegisterFutures.clear();
+
for (final SubClusterId scId : newSubClusters) {
- Future<?> future = this.threadpool.submit(new Runnable() {
- @Override
- public void run() {
- String subClusterId = scId.getId();
-
- // Create a config loaded with federation on and subclusterId
- // for each UAM
- YarnConfiguration config = new YarnConfiguration(getConf());
- FederationProxyProviderUtil.updateConfForFederation(config,
- subClusterId);
-
- RegisterApplicationMasterResponse uamResponse = null;
- Token<AMRMTokenIdentifier> token = null;
- try {
- ApplicationId applicationId = attemptId.getApplicationId();
- ApplicationSubmissionContext originalSubmissionContext =
-
federationFacade.getApplicationSubmissionContext(applicationId);
-
- // For appNameSuffix, use subClusterId of the home sub-cluster
- token = uamPool.launchUAM(subClusterId, config,
- applicationId, amRegistrationResponse.getQueue(),
- getApplicationContext().getUser(), homeSubClusterId.toString(),
- true, subClusterId, originalSubmissionContext);
-
- secondaryRelayers.put(subClusterId,
- uamPool.getAMRMClientRelayer(subClusterId));
-
- uamResponse = uamPool.registerApplicationMaster(subClusterId,
- amRegistrationRequest);
- } catch (Throwable e) {
- LOG.error("Failed to register application master: " + subClusterId
- + " Application: " + attemptId, e);
- // TODO: UAM registration for this sub-cluster RM
- // failed. For now, we ignore the resource requests and continue
- // but we need to fix this and handle this situation. One way would
- // be to send the request to another RM by consulting the policy.
- return;
- }
- uamRegistrations.put(scId, uamResponse);
- LOG.info("Successfully registered unmanaged application master: "
- + subClusterId + " ApplicationId: " + attemptId);
- try {
- uamPool.allocateAsync(subClusterId, requests.get(scId),
- new HeartbeatCallBack(scId, true));
- } catch (Throwable e) {
- LOG.error("Failed to allocate async to " + subClusterId
- + " Application: " + attemptId, e);
- }
+ Future<?> future = this.threadpool.submit(() -> {
- // Save the UAM token in registry or NMSS
- try {
- if (registryClient != null) {
- registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
- subClusterId, token);
- } else if (getNMStateStore() != null) {
- getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
- NMSS_SECONDARY_SC_PREFIX + subClusterId,
- token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
- }
- } catch (Throwable e) {
- LOG.error("Failed to persist UAM token from " + subClusterId
- + " Application: " + attemptId, e);
+ String subClusterId = scId.getId();
+
+ // Create a config loaded with federation on and subclusterId
+ // for each UAM
+ YarnConfiguration config = new YarnConfiguration(getConf());
+ FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId);
+ ApplicationId applicationId = attemptId.getApplicationId();
+
+ RegisterApplicationMasterResponse uamResponse;
+ Token<AMRMTokenIdentifier> token;
+
+ // LaunchUAM And RegisterApplicationMaster
+ try {
+
Review Comment:
Avoid this break line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1432,4 +1432,53 @@ private void finishApplication() throws IOException,
YarnException {
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
}
+
+ @Test
+ public void testLaunchUAMAndRegisterApplicationMasterRetry() throws
Exception {
+
+ UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
+ interceptor.setRetryCount(2);
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(0);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+ int numberOfContainers = 3;
+ List<Container> containers = getContainersAndAssert(numberOfContainers,
numberOfContainers);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+ // Release all containers
+ releaseContainersAndAssert(containers);
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResponse);
+ Assert.assertEquals(true, finishResponse.getIsUnregistered());
Review Comment:
assertTrue
> The FederationInterceptor#launchUAM Added retry logic.
> ------------------------------------------------------
>
> Key: YARN-11509
> URL: https://issues.apache.org/jira/browse/YARN-11509
> Project: Hadoop YARN
> Issue Type: Improvement
> Components: amrmproxy
> Affects Versions: 3.4.0
> Reporter: Shilun Fan
> Assignee: Shilun Fan
> Priority: Minor
> Labels: pull-request-available
>
> There is a "todo" in the
> FederationInterceptor#registerAndAllocateWithNewSubClusters method. According
> to the "todo" description, the request needs to be retried to other
> subclusters, but changing the parameter requests in
> registerAndAllocateWithNewSubClusters is not a good operation. It is better
> to add retry logic here.
> We don't need to worry about losing requests because when the request cannot
> be satisfied, the AM of the task will continue to apply, and these requests
> will be properly transferred to other clusters for execution.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]