[
https://issues.apache.org/jira/browse/YARN-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171270#comment-16171270
]
zhangshilong commented on YARN-7214:
------------------------------------
3.
{code:java}
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null;
NodeId nodeId = rmNode.nodeId;
RMNode previousRMNode =
rmNode.context.getInactiveRMNodes().remove(nodeId);
if (previousRMNode != null) {
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
NodeId unknownNodeId =
NodesListManager.createUnknownNodeId(nodeId.getHost());
previousRMNode =
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
if (previousRMNode != null) {
ClusterMetrics.getMetrics().decrDecommisionedNMs();
}
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING ||
container.getContainerState() == ContainerState.SCHEDULED) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
}
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
}
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
{code}
4、 in NodeStatusUpdaterImpl.java
before register: getNMContainerStatuses will be called. So
completedContainer will be put into recentlyStoppedContainers.
in register request: completed containers will be sent to RM.
{code:java}
public void addCompletedContainer(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache();
if (!recentlyStoppedContainers.containsKey(containerId)) {
recentlyStoppedContainers.put(containerId,
System.currentTimeMillis() + durationToTrackStoppedContainers);
}
}
}
{code}
normal heartbeat, getContainerStatuses is called.
So completed container will not be put into containerStatuses beacause it is in
recentlyStoppedContainers.
So completed container will not be sent to RM.
{code:java}
protected List<ContainerStatus> getContainerStatuses() throws IOException {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Container container : this.context.getContainers().values()) {
ContainerId containerId = container.getContainerId();
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
if (containerStatus.getState() == ContainerState.COMPLETE) {
if (isApplicationStopped(applicationId)) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is completing, " + " remove "
+ containerId + " from NM context.");
}
context.getContainers().remove(containerId);
pendingCompletedContainers.put(containerId, containerStatus);
} else {
if (!isContainerRecentlyStopped(containerId)) {
pendingCompletedContainers.put(containerId, containerStatus);
}
}
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
addCompletedContainer(containerId);
} else {
containerStatuses.add(containerStatus);
}
}
containerStatuses.addAll(pendingCompletedContainers.values());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out " + containerStatuses.size()
+ " container statuses: " + containerStatuses);
}
return containerStatuses;
}
{code}
> duplicated container completed To AM
> ------------------------------------
>
> Key: YARN-7214
> URL: https://issues.apache.org/jira/browse/YARN-7214
> Project: Hadoop YARN
> Issue Type: Bug
> Affects Versions: 2.7.1, 3.0.0-alpha3
> Environment: hadoop 2.7.1 rm recovery and nm recovery enabled
> Reporter: zhangshilong
>
> env: hadoop 2.7.1 with rm recovery and nm recovery enabled
> case:
> spark app(app1) running least one container(named c1) in NM1.
> 1、NM1 crashed,and RM found NM1 expired in 10 minutes.
> 2、RM will remove all containers in NM1(RMNodeImpl). and app1 will receive
> c1 completed message.But RM can not send c1(to be removed) to NM1 because NM1
> lost.
> 3、NM1 restart and register with RM(c1 in register request),but RM found NM1
> is lost and will not handle containers from NM1.
> 4、NM1 will not heartbeat with c1(c1 not in heartbeat request). So c1 will
> not removed from context of NM1.
> 5、 RM restart, NM1 re register with RM。And c1 will be handled and recovered.
> RM will send c1 complted message to AM of app1. So, app1 received duplicated
> c1.
> once spark AM receive one container completed from RM, it will allocate one
> new container.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]