[
https://issues.apache.org/jira/browse/YARN-6667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17586094#comment-17586094
]
ASF GitHub Bot commented on YARN-6667:
--------------------------------------
goiri commented on code in PR #4810:
URL: https://github.com/apache/hadoop/pull/4810#discussion_r956603434
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -1545,18 +1546,66 @@ private void cacheAllocatedContainers(List<Container>
containers,
+ " from same sub-cluster: {}, so ignoring.",
container.getId(), subClusterId);
} else {
+
+ LOG.info("Duplicate containerID found in the allocated containers. "
+
+ "try to re-pick the sub-cluster.");
+
// The same container allocation from different sub-clusters,
// something is wrong.
- // TODO: YARN-6667 if some subcluster RM is configured wrong, we
- // should not fail the entire heartbeat.
- throw new YarnRuntimeException(
- "Duplicate containerID found in the allocated containers. This"
- + " can happen if the RM epoch is not configured properly."
- + " ContainerId: " + container.getId().toString()
- + " ApplicationId: " + this.attemptId + " From RM: "
- + subClusterId
- + " . Previous container was from sub-cluster: "
- + existingSubClusterId);
+ try {
+
+ Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
+ SubClusterInfo existingSubCluster =
+ federationFacade.getSubCluster(existingSubClusterId);
+ SubClusterInfo newSubCluster =
federationFacade.getSubCluster(subClusterId);
+
+ boolean existAllocatedScHealth = true;
+ boolean newAllocatedScHealth = true;
+
+ // Previous SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
+ existingSubCluster == null ||
existingSubCluster.getState().isUnusable()) {
+ existAllocatedScHealth = false;
+ }
+
+ // New SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
+ newSubCluster == null ||
newSubCluster.getState().isUnusable()) {
+ newAllocatedScHealth = false;
+ }
+
+ // If the previous RM which allocated Container is normal,
+ // the previous RM will be used first
+ if ((existAllocatedScHealth && !newAllocatedScHealth) ||
+ (existAllocatedScHealth && newAllocatedScHealth)) {
+ LOG.info("Use Previous Allocated Container's subCluster. " +
+ "ContainerId: {} ApplicationId: {} From RM: {}.",
this.attemptId,
+ container.getId(), existingSubClusterId);
+ continue;
Review Comment:
Why do we need a continue?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -1545,18 +1546,66 @@ private void cacheAllocatedContainers(List<Container>
containers,
+ " from same sub-cluster: {}, so ignoring.",
container.getId(), subClusterId);
} else {
+
+ LOG.info("Duplicate containerID found in the allocated containers. "
+
+ "try to re-pick the sub-cluster.");
+
// The same container allocation from different sub-clusters,
// something is wrong.
- // TODO: YARN-6667 if some subcluster RM is configured wrong, we
- // should not fail the entire heartbeat.
- throw new YarnRuntimeException(
- "Duplicate containerID found in the allocated containers. This"
- + " can happen if the RM epoch is not configured properly."
- + " ContainerId: " + container.getId().toString()
- + " ApplicationId: " + this.attemptId + " From RM: "
- + subClusterId
- + " . Previous container was from sub-cluster: "
- + existingSubClusterId);
+ try {
+
+ Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
+ SubClusterInfo existingSubCluster =
+ federationFacade.getSubCluster(existingSubClusterId);
+ SubClusterInfo newSubCluster =
federationFacade.getSubCluster(subClusterId);
+
+ boolean existAllocatedScHealth = true;
+ boolean newAllocatedScHealth = true;
+
+ // Previous SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
+ existingSubCluster == null ||
existingSubCluster.getState().isUnusable()) {
+ existAllocatedScHealth = false;
+ }
+
+ // New SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
+ newSubCluster == null ||
newSubCluster.getState().isUnusable()) {
+ newAllocatedScHealth = false;
+ }
+
+ // If the previous RM which allocated Container is normal,
+ // the previous RM will be used first
+ if ((existAllocatedScHealth && !newAllocatedScHealth) ||
Review Comment:
Isn't this condition repetitive? Wouldn't it just be `if
(existAllocatedScHealth)` ?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -1545,18 +1546,66 @@ private void cacheAllocatedContainers(List<Container>
containers,
+ " from same sub-cluster: {}, so ignoring.",
container.getId(), subClusterId);
} else {
+
+ LOG.info("Duplicate containerID found in the allocated containers. "
+
+ "try to re-pick the sub-cluster.");
+
// The same container allocation from different sub-clusters,
// something is wrong.
- // TODO: YARN-6667 if some subcluster RM is configured wrong, we
- // should not fail the entire heartbeat.
- throw new YarnRuntimeException(
- "Duplicate containerID found in the allocated containers. This"
- + " can happen if the RM epoch is not configured properly."
- + " ContainerId: " + container.getId().toString()
- + " ApplicationId: " + this.attemptId + " From RM: "
- + subClusterId
- + " . Previous container was from sub-cluster: "
- + existingSubClusterId);
+ try {
+
+ Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
+ SubClusterInfo existingSubCluster =
+ federationFacade.getSubCluster(existingSubClusterId);
+ SubClusterInfo newSubCluster =
federationFacade.getSubCluster(subClusterId);
+
+ boolean existAllocatedScHealth = true;
+ boolean newAllocatedScHealth = true;
+
+ // Previous SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
Review Comment:
I think we could unify these two ifs a little.
> Handle containerId duplicate without failing the heartbeat in Federation
> Interceptor
> ------------------------------------------------------------------------------------
>
> Key: YARN-6667
> URL: https://issues.apache.org/jira/browse/YARN-6667
> Project: Hadoop YARN
> Issue Type: Sub-task
> Reporter: Botong Huang
> Assignee: fanshilun
> Priority: Minor
> Labels: pull-request-available
>
> From the actual situation, the probability of this happening is very low.
> It can only be caused by the master-slave fail-hover of YARN and the wrong
> Epoch parameter configuration.
> We will try to be compatible with this situation and let the Application run
> as much as possible, using the following measures:
> 1. Select a node whose heartbeat does not time out for allocation, and at the
> same time require the node to be in the RUNNING state.
> 2. If the heartbeat of both RMs does not time out, and both are in the
> RUNNING state, select the previously allocated RM for Container processing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]