[
https://issues.apache.org/jira/browse/YARN-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15272550#comment-15272550
]
Varun Vasudev commented on YARN-4676:
-------------------------------------
Thanks for the patch [~danzhi]. My apologies for coming in late but I have some
concerns about the patch and the approach.
The code to read the hostnames and timeouts in HostsFileReader is a little
fragile and may lead to problems.
1.
{code}
+ // look ahead for optional timeout values
+ Integer timeout = null;
+ if (i < nodes.length - 1) {
+ timeout = tryParseInteger(nodes[i+1]);
+ }
+ map.put(nodes[i], timeout);
+ // skip the timeout if exist
+ if (timeout != null) {
+ i++;
+ }
{code}
This code assumes that the node names are non-numerical - this is assumption is
not correct. As per RFC 1123, you can have hostnames made up entirely of
digits. It also looks like we decommission nodes based on hostname only,
whereas it is possible to run multiple nodemanagers on a node - this is
probably something we can revisit later.
2.
{code}
+ map.put(nodes[i], timeout);
{code}
{code}
+ private static Integer tryParseInteger(String str) {
+ try{
+ int num = Integer.parseInt(str);
+ return num;
+ } catch (Exception e) {
+ return null;
+ }
+ }
{code}
Is it possible for us to use -1 instead of null to specify that a timeout
wasn't specified?
3.
{code}
+ private static void prettyLogMap(
+ String type, Map<String, Integer> excludes, String filename) {
+ if (excludes.size() == 0) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Entry<String, Integer> n : excludes.entrySet()) {
+ if (n.getValue() != null) {
+ sb.append(String.format("%n %s : %d", n.getKey(), n.getValue()));
+ } else {
+ sb.append(String.format("%n %s", n.getKey()));
+ }
+ }
+ LOG.info("List of " + type + " hosts from " + filename + sb.toString());
+ }
{code}
Instead of %n, can we just print all the hosts on a single line so that we can
use grep to filter out the lines.
4.
{code}
+ @Test
+ public void testHostFileReaderWithTimeout() throws Exception {
{code}
The test needs to be updated to include numeric hostnames.
5.
{code}
+
+ @Override
+ public Integer getDecommissioningTimeout() {
+ return null;
+ }
+ /**
+ * Get the DecommissionTimeout.
+ *
+ * @return decommissionTimeout
+ */
+ public abstract Integer getDecommissionTimeout();
}
{code}
Similar to above, can we user -1 instead of null?
6.
{code}
+ public static final String DECOMMISSIONING_DEFAULT_TIMEOUT_KEY =
+ RM_PREFIX + "decommissioning.default.timeout";
+ public static final int DEFAULT_DECOMMISSIONING_TIMEOUT = 3600;
{code}
Can you please rename DECOMMISSIONING_DEFAULT_TIMEOUT_KEY to
RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "decommissioning.default.timeout" to
"nodemanager-graceful-decommission-timeout-secs" and
DEFAULT_DECOMMISSIONING_TIMEOUT to DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT
7.
{code}
+ public static final String NM_EXIT_WAIT_MS = NM_PREFIX + "exit-wait.ms";
+ public static final long DEFAULT_NM_EXIT_WAIT_MS = 5000;
{code}
I saw your reasoning for this in your earlier comments, but I'm not convinced
this should be in the YARN nodemanager. This seems like an issue with the EMR
setup. The change adds a wait time for all shutdowns. Please remove it.
8.
{code}
+ // Additional seconds to wait before forcefully decommission nodes.
+ // This is usually not needed since RM enforces timeout automatically.
+ final int gracePeriod = 20;
{code}
Can you explain why this is needed? And why 20 seconds for the grace period?
9.
{code}
+ if ("-g".equals(args[1]) || "-graceful".equals(args[1])) {
+ if (args.length == 3) {
+ int timeout = validateTimeout(args[2]);
+ return refreshNodes(timeout);
+ } else {
+ return refreshNodes(true);
+ }
+ }
{code}
Just to clarify my understanding here -
yarn rmadmin -refreshNodes -g 1000 will decommission node gracefully up to a
limit of 1000 seconds after which it will forcefully shut down the nodes
yarn rmadmin -refreshNodes -g -1 will gracefully shutdown the nodes with the
timeout being the value of
yarn.resourcemanager.node-graceful-decommission-timeout
yarn rmadmin -refreshNodes -g is the same as "yarn rmadmin -refreshNodes -g -1"
Is my understanding correct?
10.
{code}
+ @Override
+ public synchronized void setDecommissionTimeout(Integer timeout) {
+ maybeInitBuilder();
+ if (timeout != null) {
+ builder.setDecommissionTimeout(timeout);
+ } else {
+ builder.clearDecommissionTimeout();
+ }
+ }
+
+ @Override
+ public synchronized Integer getDecommissionTimeout() {
+ RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
+ }
{code}
You've used Integer everywhere. Is there any reason you didn't go with
int(apart from the HashMap)?
11.
{code}
+ <name>yarn.resourcemanager.decommissioning.default.timeout</name>
{code}
Please change the name to match the config above.
12.
{code}
+ long exitWaitMs = conf.getLong(YarnConfiguration.NM_EXIT_WAIT_MS,
+ YarnConfiguration.DEFAULT_NM_EXIT_WAIT_MS);
+ LOG.fatal("Exit in " + exitWaitMs + " milliseconds");
+ if (exitWaitMs > 0) {
+ try {
+ Thread.sleep(exitWaitMs);
+ } catch (InterruptedException e) {
+ }
+ }
{code}
Please remove this.
13.
{code}
+import com.google.common.base.Stopwatch;
...
+ private Stopwatch pollWatch = new Stopwatch().start();
{code}
Please switch to the Hadoop stopwatch(org.apache.hadoop.util.StopWatch)
14.
{code}
+ // keep DECOMMISSIONED node for a while for status log.
+ if (context.decommissionedTime == 0) {
+ context.decommissionedTime = now;
+ } else if (now - context.decommissionedTime > 60000L) {
+ decomNodes.remove(rmNode.getNodeID());
+ LOG.info("remove " + rmNode.getState() + " " + rmNode.getNodeID());
+ }
{code}
Can you please explain why we need to keep nodes just for logging purposes
after they've been decommissioned? Is the state change to DECOMMISSIONED not
logged?
15.
{code}
+ public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
...
+ removeCompletedApps(context);
{code}
Can we move the removeCompleteApps outside this function? It doesn't seem
intuitive that the checkReadyToBeDecommissioned function needs to call
removeCompletedApps.
16.
{code}
+ readDecommissioningTimeout(null);
{code}
Can you please remove this call and the function implementation? Creating a
YarnConfiguration object is a fairly expensive call and there's no reason to be
doing every time poll is called. The default value for the timeout is unlikely
to change so often as to require reading it repeatedly from the config file.
17.
{code}
+ LOG.info("Consider non-existing app " + appId + " as completed");
+ LOG.info("Remove " + rmApp.getState() + " app " + appId);
{code}
Please change the log levels for these to debug.
18.
{code}
+ private int getTimeoutInSec(DecommissioningNodeContext context) {
{code}
Rename getTimeoutInSec to getTimeoutTimestampInSec.
19.
{code}
+ private void logDecommissioningNodesStatus() {
+ if (decomNodes.size() == 0) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ long now = System.currentTimeMillis();
+ for (DecommissioningNodeContext d : decomNodes.values()) {
+ DecomNodeStatus s = getDecommissioningStatus(d.nodeId);
+ sb.append(String.format(
+ "%n %-34s %4ds fresh:%3ds containers:%2d %14s",
+ d.nodeId.getHost(),
+ (now - d.decommissioningStartTime) / 1000,
+ (now - d.lastUpdateTime) / 1000,
+ d.numActiveContainers,
+ s));
+ if (s == DecomNodeStatus.WAIT_APP ||
+ s == DecomNodeStatus.WAIT_CONTAINER) {
+ sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d)));
+ }
+ for (ApplicationId aid : d.appIds) {
+ sb.append("\n " + aid);
+ RMApp rmApp = rmContext.getRMApps().get(aid);
+ if (rmApp != null) {
+ sb.append(String.format(
+ " %s %9s %5.2f%% %5ds",
+ rmApp.getState(),
+ (rmApp.getApplicationType() == null)?
+ "" : rmApp.getApplicationType(),
+ 100.0 * rmApp.getProgress(),
+ (System.currentTimeMillis() - rmApp.getStartTime()) / 1000));
+ }
+ }
+ }
+ LOG.info("Decommissioning Nodes: " + sb.toString());
+ }
{code}
Instead of logging the status every 20 seconds, can we log it only in debug
mode? Is there any benefit to logging at info level?
20.
{code}
+ private DecommissioningNodesWatcher decomWatcher;
+
{code}
Please rename decomWatcher to decommissioningWatcher
21.
{code}
+ // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
+ if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+ decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) {
+ String message = "DECOMMISSIONING " + nodeId +
+ " is ready to be decommissioned";
+ LOG.info(message);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ return YarnServerBuilderUtils.newNodeHeartbeatResponse(
+ NodeAction.SHUTDOWN, message);
+ }
{code}
comes after
{code}
if (!this.nodesListManager.isValidNode(nodeId.getHost())
&& !isNodeInDecommissioning(nodeId)) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
LOG.info(message);
return YarnServerBuilderUtils.newNodeHeartbeatResponse(
NodeAction.SHUTDOWN, message);
}
{code}
Can you please clarify something here - won't this condition always be true if
decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID()) is true? When
will the code you added be called?
22.
{code}
+ public Set<NodeId> poll() {
+ if (decomNodes.size() == 0 ||
+ pollWatch.elapsedTime(TimeUnit.SECONDS) < 20) {
+ return emptyNodeIdSet;
+ }
+ return pollInternal();
+ }
+
+ private synchronized Set<NodeId> pollInternal() {
+ pollWatch.reset().start();
+ readDecommissioningTimeout(null);
+ logDecommissioningNodesStatus();
+ long now = System.currentTimeMillis();
+ Set<NodeId> output = new HashSet<NodeId>();
+
+ for (Iterator<Map.Entry<NodeId, DecommissioningNodeContext>> it =
+ decomNodes.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<NodeId, DecommissioningNodeContext> e = it.next();
+ DecommissioningNodeContext d = e.getValue();
+ // Skip node recently updated (NM usually updates every second).
+ if (now - d.lastUpdateTime < 30000L) {
+ continue;
+ }
+ // Remove stale non-DECOMMISSIONING node
+ if (d.nodeState != NodeState.DECOMMISSIONING) {
+ LOG.info("remove " + d.nodeState + " " + d.nodeId);
+ it.remove();
+ continue;
+ } else if (now - d.lastUpdateTime > 60000L) {
+ // Node DECOMMISSIONED could become stale, check RMNode state to
remove.
+ RMNode rmNode = getRmNode(d.nodeId);
+ if (rmNode != null && rmNode.getState() == NodeState.DECOMMISSIONED) {
+ LOG.info("remove " + rmNode.getState() + " " + d.nodeId);
+ it.remove();
+ continue;
+ }
+ }
+ if (d.timeoutMs >= 0 && d.decommissioningStartTime + d.timeoutMs < now) {
+ output.add(d.nodeId);
+ LOG.info("Identified stale and timeout node " + d.nodeId);
+ }
+ }
+ return output;
+ }
{code}
{code}
+ pollDecommissioningNodesWatcher();
{code}
We shouldn't be calling this for every node heartbeat. It seems from your
implementation that this should really be in it's own thread with a timer. That
way you can avoid checks like last called time, etc. I'm not even sure you need
to call pollDecommissioningNodesWatcher in ResourceTrackerService. It seems to
me that it can be contained entirely in DecommissioningNodesWatcher.
23.
{code}
+import com.google.common.base.Joiner;
{code}
Please use the Apache commons join from StringUtils.
24.
{code}
public class RMNodeEvent extends AbstractEvent<RMNodeEventType> {
private final NodeId nodeId;
+ // Optional decommissioning timeout in second.
+ private final Integer decommissioningTimeout;
public RMNodeEvent(NodeId nodeId, RMNodeEventType type) {
super(type);
this.nodeId = nodeId;
+ this.decommissioningTimeout = null;
+ }
+
+ // Create instance with optional timeout
+ // (timeout could be null which means use default).
+ public RMNodeEvent(NodeId nodeId, RMNodeEventType type, Integer timeout) {
+ super(type);
+ this.nodeId = nodeId;
+ this.decommissioningTimeout = timeout;
}
public NodeId getNodeId() {
return this.nodeId;
}
+
+ public Integer getDecommissioningTimeout() {
+ return this.decommissioningTimeout;
+ }
{code}
Instead of modifying the RMNodeEvent class, please create a new class which
inherits from RMNodeEvent for your purpose with the custom constructor.
25.
With regards to work preserving restart, I think the command should just exit
with a message saying that the feature is not supported when work preserving
restart is enabled and not do anything. Once support for work preserving
restart has been added, the message can be removed.
Again, my apologies for coming in late with the review.
> Automatic and Asynchronous Decommissioning Nodes Status Tracking
> ----------------------------------------------------------------
>
> Key: YARN-4676
> URL: https://issues.apache.org/jira/browse/YARN-4676
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: resourcemanager
> Affects Versions: 2.8.0
> Reporter: Daniel Zhi
> Assignee: Daniel Zhi
> Labels: features
> Attachments: GracefulDecommissionYarnNode.pdf,
> GracefulDecommissionYarnNode.pdf, YARN-4676.004.patch, YARN-4676.005.patch,
> YARN-4676.006.patch, YARN-4676.007.patch, YARN-4676.008.patch,
> YARN-4676.009.patch, YARN-4676.010.patch, YARN-4676.011.patch,
> YARN-4676.012.patch, YARN-4676.013.patch
>
>
> YARN-4676 implements an automatic, asynchronous and flexible mechanism to
> graceful decommission
> YARN nodes. After user issues the refreshNodes request, ResourceManager
> automatically evaluates
> status of all affected nodes to kicks out decommission or recommission
> actions. RM asynchronously
> tracks container and application status related to DECOMMISSIONING nodes to
> decommission the
> nodes immediately after there are ready to be decommissioned. Decommissioning
> timeout at individual
> nodes granularity is supported and could be dynamically updated. The
> mechanism naturally supports multiple
> independent graceful decommissioning “sessions” where each one involves
> different sets of nodes with
> different timeout settings. Such support is ideal and necessary for graceful
> decommission request issued
> by external cluster management software instead of human.
> DecommissioningNodeWatcher inside ResourceTrackingService tracks
> DECOMMISSIONING nodes status automatically and asynchronously after
> client/admin made the graceful decommission request. It tracks
> DECOMMISSIONING nodes status to decide when, after all running containers on
> the node have completed, will be transitioned into DECOMMISSIONED state.
> NodesListManager detect and handle include and exclude list changes to kick
> out decommission or recommission as necessary.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]