[
https://issues.apache.org/jira/browse/YARN-11306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606752#comment-17606752
]
ASF GitHub Bot commented on YARN-11306:
---------------------------------------
goiri commented on code in PR #4897:
URL: https://github.com/apache/hadoop/pull/4897#discussion_r974657324
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -463,42 +461,67 @@ public void recover(Map<String, byte[]> recoveredDataMap)
{
// map as well.
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
- appSubmitter =
UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+ appSubmitter = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getLoginUser());
} else {
- appSubmitter =
UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+ appSubmitter = UserGroupInformation.createRemoteUser(user);
}
- ApplicationClientProtocol rmClient =
- createHomeRMProxy(getApplicationContext(),
- ApplicationClientProtocol.class, appSubmitter);
- GetContainersResponse response = rmClient
- .getContainers(GetContainersRequest.newInstance(this.attemptId));
+ ApplicationClientProtocol rmClient =
createHomeRMProxy(applicationContext,
+ ApplicationClientProtocol.class, appSubmitter);
+
+ GetContainersRequest request =
GetContainersRequest.newInstance(this.attemptId);
+ GetContainersResponse response = rmClient.getContainers(request);
+
for (ContainerReport container : response.getContainerList()) {
- containerIdToSubClusterIdMap.put(container.getContainerId(),
- this.homeSubClusterId);
+ ContainerId containerId = container.getContainerId();
+ containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
containers++;
- LOG.debug(" From home RM {} running container {}",
- this.homeSubClusterId, container.getContainerId());
+ LOG.debug("From home RM {} running container {}.",
this.homeSubClusterId, containerId);
}
- LOG.info("{} running containers including AM recovered from home RM {}",
+ LOG.info("{} running containers including AM recovered from home RM {}.",
response.getContainerList().size(), this.homeSubClusterId);
- LOG.info(
- "In all {} UAMs {} running containers including AM recovered for {}",
+ LOG.info("In all {} UAMs {} running containers including AM recovered
for {}.",
uamMap.size(), containers, this.attemptId);
- if (this.amRegistrationResponse != null) {
+ if (queue != null) {
// Initialize the AMRMProxyPolicy
- String queue = this.amRegistrationResponse.getQueue();
- this.policyInterpreter =
- FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
- getConf(), this.federationFacade, this.homeSubClusterId);
+ queue = this.amRegistrationResponse.getQueue();
+ this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue,
this.policyInterpreter,
+ getConf(), this.federationFacade, this.homeSubClusterId);
}
} catch (IOException | YarnException e) {
throw new YarnRuntimeException(e);
}
+ }
+ private Map<String, Token<AMRMTokenIdentifier>> getSCAMRMTokenIdentifierMap(
+ Map<String, byte[]> recoveredDataMap) throws IOException {
+ Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+ ApplicationId applicationId = this.attemptId.getApplicationId();
+ if (this.registryClient != null) {
+ uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+ LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
+ uamMap.size(), applicationId);
+ } else {
+ for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
Review Comment:
It might be good to have this large chunk in a separate method.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -463,42 +461,67 @@ public void recover(Map<String, byte[]> recoveredDataMap)
{
// map as well.
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
- appSubmitter =
UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+ appSubmitter = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getLoginUser());
} else {
- appSubmitter =
UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+ appSubmitter = UserGroupInformation.createRemoteUser(user);
}
- ApplicationClientProtocol rmClient =
- createHomeRMProxy(getApplicationContext(),
- ApplicationClientProtocol.class, appSubmitter);
- GetContainersResponse response = rmClient
- .getContainers(GetContainersRequest.newInstance(this.attemptId));
+ ApplicationClientProtocol rmClient =
createHomeRMProxy(applicationContext,
+ ApplicationClientProtocol.class, appSubmitter);
+
+ GetContainersRequest request =
GetContainersRequest.newInstance(this.attemptId);
+ GetContainersResponse response = rmClient.getContainers(request);
+
for (ContainerReport container : response.getContainerList()) {
- containerIdToSubClusterIdMap.put(container.getContainerId(),
- this.homeSubClusterId);
+ ContainerId containerId = container.getContainerId();
+ containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
containers++;
- LOG.debug(" From home RM {} running container {}",
- this.homeSubClusterId, container.getContainerId());
+ LOG.debug("From home RM {} running container {}.",
this.homeSubClusterId, containerId);
}
- LOG.info("{} running containers including AM recovered from home RM {}",
+ LOG.info("{} running containers including AM recovered from home RM {}.",
response.getContainerList().size(), this.homeSubClusterId);
- LOG.info(
- "In all {} UAMs {} running containers including AM recovered for {}",
+ LOG.info("In all {} UAMs {} running containers including AM recovered
for {}.",
uamMap.size(), containers, this.attemptId);
- if (this.amRegistrationResponse != null) {
+ if (queue != null) {
// Initialize the AMRMProxyPolicy
- String queue = this.amRegistrationResponse.getQueue();
- this.policyInterpreter =
- FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
- getConf(), this.federationFacade, this.homeSubClusterId);
+ queue = this.amRegistrationResponse.getQueue();
+ this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue,
this.policyInterpreter,
+ getConf(), this.federationFacade, this.homeSubClusterId);
}
} catch (IOException | YarnException e) {
throw new YarnRuntimeException(e);
}
+ }
+ private Map<String, Token<AMRMTokenIdentifier>> getSCAMRMTokenIdentifierMap(
+ Map<String, byte[]> recoveredDataMap) throws IOException {
+ Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+ ApplicationId applicationId = this.attemptId.getApplicationId();
+ if (this.registryClient != null) {
+ uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+ LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
Review Comment:
YARN registry.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,130 @@ public void testRemoveAppFromRegistryApplicationFailed()
return null;
});
}
+
+ public void testRecoverWithBadSubCluster(final RegistryOperations
registryObj) throws IOException, InterruptedException {
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
Review Comment:
To have such a long function, does it make sense to define the function as a
method itself and call it instead a huge nested lambda?
> Refactor NM#FederationInterceptor#recover Code
> ----------------------------------------------
>
> Key: YARN-11306
> URL: https://issues.apache.org/jira/browse/YARN-11306
> Project: Hadoop YARN
> Issue Type: Improvement
> Components: federation, nodemanager
> Affects Versions: 3.4.0
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
> Refactor NM#FederationInterceptor#recover Code
> 1.Enhance code readability
> 2.Add empty check
> 3.When an exception is encountered, completely destroy the data of SubCluster
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]