Author: bikas
Date: Tue Jul 16 22:54:55 2013
New Revision: 1503935
URL: http://svn.apache.org/r1503935
Log:
Merge r1503933 from trunk to branch-2 for YARN-513. Create common proxy client
for communicating with RM (Xuan Gong & Jian He via bikas)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
- copied unchanged from r1503933,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/
- copied from r1503933,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
- copied unchanged from r1503933,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
- copied unchanged from r1503933,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Jul 16
22:54:55 2013
@@ -465,6 +465,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-521. Augment AM - RM client module to be able to request containers
only at specific locations (Sandy Ryza via bikas)
+ YARN-513. Create common proxy client for communicating with RM. (Xuan Gong
+ & Jian He via bikas)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
Tue Jul 16 22:54:55 2013
@@ -655,17 +655,17 @@ public class YarnConfiguration extends C
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000;
- /** Max time to wait to establish a connection to RM when NM starts
+ /** Max time to wait to establish a connection to RM
*/
- public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
- NM_PREFIX + "resourcemanager.connect.wait.secs";
- public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
+ public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS =
+ RM_PREFIX + "resourcemanager.connect.max.wait.secs";
+ public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS =
15*60;
- /** Time interval between each NM attempt to connect to RM
+ /** Time interval between each attempt to connect to RM
*/
public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
- NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
+ RM_PREFIX + "resourcemanager.connect.retry_interval.secs";
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
= 30;
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
Tue Jul 16 22:54:55 2013
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
@@ -54,25 +53,6 @@ public abstract class YarnClient extends
return client;
}
- /**
- * Create a new instance of YarnClient.
- */
- @Public
- public static YarnClient createYarnClient(InetSocketAddress rmAddress) {
- YarnClient client = new YarnClientImpl(rmAddress);
- return client;
- }
-
- /**
- * Create a new instance of YarnClient.
- */
- @Public
- public static YarnClient createYarnClient(String name,
- InetSocketAddress rmAddress) {
- YarnClient client = new YarnClientImpl(name, rmAddress);
- return client;
- }
-
@Private
protected YarnClient(String name) {
super(name);
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
Tue Jul 16 22:54:55 2013
@@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -42,7 +40,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -56,16 +53,16 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@@ -171,28 +168,11 @@ public class AMRMClientImpl<T extends Co
@Override
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
- final YarnRPC rpc = YarnRPC.create(conf);
- final InetSocketAddress rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
- UserGroupInformation currentUser;
try {
- currentUser = UserGroupInformation.getCurrentUser();
+ rmClient = ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
-
- // CurrentUser should already have AMToken loaded.
- rmClient = currentUser.doAs(new
PrivilegedAction<ApplicationMasterProtocol>() {
- @Override
- public ApplicationMasterProtocol run() {
- return (ApplicationMasterProtocol)
rpc.getProxy(ApplicationMasterProtocol.class, rmAddress,
- conf);
- }
- });
- LOG.debug("Connecting to ResourceManager at " + rmAddress);
super.serviceStart();
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
Tue Jul 16 22:54:55 2013
@@ -59,11 +59,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
@@ -81,16 +82,7 @@ public class YarnClientImpl extends Yarn
private static final String ROOT = "root";
public YarnClientImpl() {
- this(null);
- }
-
- public YarnClientImpl(InetSocketAddress rmAddress) {
- this(YarnClientImpl.class.getName(), rmAddress);
- }
-
- public YarnClientImpl(String name, InetSocketAddress rmAddress) {
- super(name);
- this.rmAddress = rmAddress;
+ super(YarnClientImpl.class.getName());
}
private static InetSocketAddress getRmAddress(Configuration conf) {
@@ -100,9 +92,7 @@ public class YarnClientImpl extends Yarn
@Override
protected void serviceInit(Configuration conf) throws Exception {
- if (this.rmAddress == null) {
- this.rmAddress = getRmAddress(conf);
- }
+ this.rmAddress = getRmAddress(conf);
statePollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
@@ -111,12 +101,11 @@ public class YarnClientImpl extends Yarn
@Override
protected void serviceStart() throws Exception {
- YarnRPC rpc = YarnRPC.create(getConfig());
-
- this.rmClient = (ApplicationClientProtocol) rpc.getProxy(
- ApplicationClientProtocol.class, rmAddress, getConfig());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to ResourceManager at " + rmAddress);
+ try {
+ rmClient = ClientRMProxy.createRMProxy(getConfig(),
+ ApplicationClientProtocol.class);
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
}
super.serviceStart();
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
Tue Jul 16 22:54:55 2013
@@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.client.cli;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -31,11 +29,11 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import
org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -164,32 +162,10 @@ public class RMAdminCLI extends Configur
}
}
- private static UserGroupInformation getUGI(Configuration conf
- ) throws IOException {
- return UserGroupInformation.getCurrentUser();
- }
-
private ResourceManagerAdministrationProtocol createAdminProtocol() throws
IOException {
// Get the current configuration
final YarnConfiguration conf = new YarnConfiguration(getConf());
-
- // Create the client
- final InetSocketAddress addr = conf.getSocketAddr(
- YarnConfiguration.RM_ADMIN_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
- final YarnRPC rpc = YarnRPC.create(conf);
-
- ResourceManagerAdministrationProtocol adminProtocol =
- getUGI(conf).doAs(new
PrivilegedAction<ResourceManagerAdministrationProtocol>() {
- @Override
- public ResourceManagerAdministrationProtocol run() {
- return (ResourceManagerAdministrationProtocol)
rpc.getProxy(ResourceManagerAdministrationProtocol.class,
- addr, conf);
- }
- });
-
- return adminProtocol;
+ return ClientRMProxy.createRMProxy(conf,
ResourceManagerAdministrationProtocol.class);
}
private int refreshQueues() throws IOException, YarnException {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
Tue Jul 16 22:54:55 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.api.impl.pb.client;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.server.api
import com.google.protobuf.ServiceException;
-public class ResourceTrackerPBClientImpl implements ResourceTracker {
+public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable
{
private ResourceTrackerPB proxy;
@@ -50,7 +51,14 @@ private ResourceTrackerPB proxy;
proxy = (ResourceTrackerPB)RPC.getProxy(
ResourceTrackerPB.class, clientVersion, addr, conf);
}
-
+
+ @Override
+ public void close() {
+ if(this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ }
+ }
+
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
Tue Jul 16 22:54:55 2013
@@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,9 +48,9 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -77,7 +78,6 @@ public class NodeStatusUpdaterImpl exten
private NodeId nodeId;
private long nextHeartBeatInterval;
private ResourceTracker resourceTracker;
- private InetSocketAddress rmAddress;
private Resource totalResource;
private int httpPort;
private volatile boolean isStopped;
@@ -91,9 +91,6 @@ public class NodeStatusUpdaterImpl exten
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
- private long rmConnectWaitMS;
- private long rmConnectionRetryIntervalMS;
- private boolean waitForEver;
private Runnable statusUpdaterRunnable;
private Thread statusUpdater;
@@ -110,11 +107,6 @@ public class NodeStatusUpdaterImpl exten
@Override
protected void serviceInit(Configuration conf) throws Exception {
- this.rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
-
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB);
@@ -153,6 +145,7 @@ public class NodeStatusUpdaterImpl exten
try {
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
+ this.resourceTracker = getRMClient();
registerWithRM();
super.serviceStart();
startStatusUpdater();
@@ -167,6 +160,7 @@ public class NodeStatusUpdaterImpl exten
protected void serviceStop() throws Exception {
// Interrupt the updater.
this.isStopped = true;
+ stopRMProxy();
super.serviceStop();
}
@@ -188,6 +182,13 @@ public class NodeStatusUpdaterImpl exten
}
}
+ @VisibleForTesting
+ protected void stopRMProxy() {
+ if(this.resourceTracker != null) {
+ RPC.stopProxy(this.resourceTracker);
+ }
+ }
+
@Private
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -195,93 +196,22 @@ public class NodeStatusUpdaterImpl exten
&& UserGroupInformation.isSecurityEnabled();
}
- protected ResourceTracker getRMClient() {
+ @VisibleForTesting
+ protected ResourceTracker getRMClient() throws IOException {
Configuration conf = getConfig();
- YarnRPC rpc = YarnRPC.create(conf);
- return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
- conf);
+ return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
}
@VisibleForTesting
protected void registerWithRM() throws YarnException, IOException {
- Configuration conf = getConfig();
- rmConnectWaitMS =
- conf.getInt(
- YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
- YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
- * 1000;
- rmConnectionRetryIntervalMS =
- conf.getLong(
- YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
- YarnConfiguration
- .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
- * 1000;
-
- if(rmConnectionRetryIntervalMS < 0) {
- throw new YarnRuntimeException("Invalid Configuration. " +
- YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
- " should not be negative.");
- }
-
- waitForEver = (rmConnectWaitMS == -1000);
-
- if(! waitForEver) {
- if(rmConnectWaitMS < 0) {
- throw new YarnRuntimeException("Invalid Configuration. " +
- YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
- " can be -1, but can not be other negative numbers");
- }
-
- //try connect once
- if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
- LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
- + " is smaller than "
- + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
- + ". Only try connect once.");
- rmConnectWaitMS = 0;
- }
- }
-
- int rmRetryCount = 0;
- long waitStartTime = System.currentTimeMillis();
-
RegisterNodeManagerRequest request =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
- RegisterNodeManagerResponse regNMResponse;
-
- while(true) {
- try {
- rmRetryCount++;
- LOG.info("Connecting to ResourceManager at " + this.rmAddress
- + ". current no. of attempts is " + rmRetryCount);
- this.resourceTracker = getRMClient();
- regNMResponse =
- this.resourceTracker.registerNodeManager(request);
- this.rmIdentifier = regNMResponse.getRMIdentifier();
- break;
- } catch(Throwable e) {
- LOG.warn("Trying to connect to ResourceManager, " +
- "current no. of failed attempts is "+rmRetryCount);
- if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
- || waitForEver) {
- try {
- LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
- + " seconds before next connection retry to RM");
- Thread.sleep(rmConnectionRetryIntervalMS);
- } catch(InterruptedException ex) {
- //done nothing
- }
- } else {
- String errorMessage = "Failed to Connect to RM, " +
- "no. of failed attempts is "+rmRetryCount;
- LOG.error(errorMessage,e);
- throw new YarnRuntimeException(errorMessage,e);
- }
- }
- }
+ RegisterNodeManagerResponse regNMResponse =
+ resourceTracker.registerNodeManager(request);
+ this.rmIdentifier = regNMResponse.getRMIdentifier();
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
@@ -426,8 +356,6 @@ public class NodeStatusUpdaterImpl exten
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
- int rmRetryCount = 0;
- long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus =
getNodeStatusAndUpdateContainersInContext();
nodeStatus.setResponseId(lastHeartBeatID);
@@ -440,31 +368,7 @@ public class NodeStatusUpdaterImpl exten
request
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey());
- while (!isStopped) {
- try {
- rmRetryCount++;
- response = resourceTracker.nodeHeartbeat(request);
- break;
- } catch (Throwable e) {
- LOG.warn("Trying to heartbeat to ResourceManager, "
- + "current no. of failed attempts is " + rmRetryCount);
- if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
- || waitForEver) {
- try {
- LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
- + " seconds before next heartbeat to RM");
- Thread.sleep(rmConnectionRetryIntervalMS);
- } catch(InterruptedException ex) {
- //done nothing
- }
- } else {
- String errorMessage = "Failed to heartbeat to RM, " +
- "no. of failed attempts is "+rmRetryCount;
- LOG.error(errorMessage,e);
- throw new YarnRuntimeException(errorMessage,e);
- }
- }
- }
+ response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
@@ -508,11 +412,11 @@ public class NodeStatusUpdaterImpl exten
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
- } catch (YarnRuntimeException e) {
+ } catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect
RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
- throw e;
+ throw new YarnRuntimeException(e);
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
Tue Jul 16 22:54:55 2013
@@ -61,6 +61,10 @@ public class MockNodeStatusUpdater exten
protected ResourceTracker getRMClient() {
return resourceTracker;
}
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
private static class MockResourceTracker implements ResourceTracker {
private int heartBeatID;
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
Tue Jul 16 22:54:55 2013
@@ -107,6 +107,11 @@ public class TestEventFlow {
return new LocalRMInterface();
};
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
+
@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Tue Jul 16 22:54:55 2013
@@ -41,6 +41,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.ServiceOperations;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -60,9 +63,9 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -103,11 +106,17 @@ public class TestNodeStatusUpdater {
volatile int heartBeatID = 0;
volatile Throwable nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
- private final Configuration conf = createNMConfig();
+ private boolean triggered = false;
+ private Configuration conf;
private NodeManager nm;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new
ArrayList<ContainerStatus>();
+ @Before
+ public void setUp() {
+ conf = createNMConfig();
+ }
+
@After
public void tearDown() {
this.registeredNodes.clear();
@@ -274,6 +283,11 @@ public class TestNodeStatusUpdater {
protected ResourceTracker getRMClient() {
return resourceTracker;
}
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
}
private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
@@ -290,6 +304,10 @@ public class TestNodeStatusUpdater {
return resourceTracker;
}
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@@ -307,7 +325,12 @@ public class TestNodeStatusUpdater {
protected ResourceTracker getRMClient() {
return resourceTracker;
}
-
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
+
@Override
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return true;
@@ -315,21 +338,16 @@ public class TestNodeStatusUpdater {
}
private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
- public ResourceTracker resourceTracker =
- new MyResourceTracker(this.context);
+
private Context context;
- private long waitStartTime;
private final long rmStartIntervalMS;
private final boolean rmNeverStart;
- private volatile boolean triggered = false;
- private long durationWhenTriggered = -1;
-
+ public ResourceTracker resourceTracker;
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
long rmStartIntervalMS, boolean rmNeverStart) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
- this.waitStartTime = System.currentTimeMillis();
this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart;
}
@@ -337,25 +355,16 @@ public class TestNodeStatusUpdater {
@Override
protected void serviceStart() throws Exception {
//record the startup time
- this.waitStartTime = System.currentTimeMillis();
super.serviceStart();
}
@Override
- protected ResourceTracker getRMClient() {
- if (!triggered) {
- long t = System.currentTimeMillis();
- long duration = t - waitStartTime;
- if (duration <= rmStartIntervalMS
- || rmNeverStart) {
- throw new YarnRuntimeException("Faking RM start failure as start " +
- "delay timer has not expired.");
- } else {
- //triggering
- triggered = true;
- durationWhenTriggered = duration;
- }
- }
+ protected ResourceTracker getRMClient() throws IOException {
+ RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+ resourceTracker =
+ (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+ new MyResourceTracker6(this.context, rmStartIntervalMS,
+ rmNeverStart), retryPolicy);
return resourceTracker;
}
@@ -363,37 +372,35 @@ public class TestNodeStatusUpdater {
return triggered;
}
- private long getWaitStartTime() {
- return waitStartTime;
- }
-
- private long getDurationWhenTriggered() {
- return durationWhenTriggered;
- }
-
@Override
- public String toString() {
- return "MyNodeStatusUpdater4{" +
- "rmNeverStart=" + rmNeverStart +
- ", triggered=" + triggered +
- ", duration=" + durationWhenTriggered +
- ", rmStartIntervalMS=" + rmStartIntervalMS +
- '}';
+ protected void stopRMProxy() {
+ return;
}
}
+
+
private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
private ResourceTracker resourceTracker;
+ private Configuration conf;
public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
Configuration conf) {
super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MyResourceTracker5();
+ this.conf = conf;
}
@Override
protected ResourceTracker getRMClient() {
- return resourceTracker;
+ RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+ return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+ resourceTracker, retryPolicy);
+ }
+
+ @Override
+ protected void stopRMProxy() {
+ return;
}
}
@@ -417,15 +424,18 @@ public class TestNodeStatusUpdater {
public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater;
private CyclicBarrier syncBarrier;
- public MyNodeManager2 (CyclicBarrier syncBarrier) {
+ private Configuration conf;
+
+ public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) {
this.syncBarrier = syncBarrier;
+ this.conf = conf;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
nodeStatusUpdater =
new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
- metrics);
+ metrics, conf);
return nodeStatusUpdater;
}
@@ -577,7 +587,7 @@ public class TestNodeStatusUpdater {
.get(4).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(4)
.getContainerId().getId() == 5);
- throw new YarnRuntimeException("Lost the heartbeat response");
+ throw new java.net.ConnectException("Lost the heartbeat response");
} else if (heartBeatID == 2) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
.size(), 7);
@@ -646,7 +656,63 @@ public class TestNodeStatusUpdater {
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
heartBeatID++;
- throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+ throw new java.net.ConnectException(
+ "NodeHeartbeat exception");
+ }
+ }
+
+ private class MyResourceTracker6 implements ResourceTracker {
+
+ private final Context context;
+ private long rmStartIntervalMS;
+ private boolean rmNeverStart;
+ private final long waitStartTime;
+
+ public MyResourceTracker6(Context context, long rmStartIntervalMS,
+ boolean rmNeverStart) {
+ this.context = context;
+ this.rmStartIntervalMS = rmStartIntervalMS;
+ this.rmNeverStart = rmNeverStart;
+ this.waitStartTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException, IOException,
+ IOException {
+ if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
+ || rmNeverStart) {
+ throw new java.net.ConnectException("Faking RM start failure as start "
+ + "delay timer has not expired.");
+ } else {
+ NodeId nodeId = request.getNodeId();
+ Resource resource = request.getResource();
+ LOG.info("Registering " + nodeId.toString());
+ // NOTE: this really should be checking against the config value
+ InetSocketAddress expected = NetUtils.getConnectAddress(
+ conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
+ Assert.assertEquals(NetUtils.getHostPortString(expected),
+ nodeId.toString());
+ Assert.assertEquals(5 * 1024, resource.getMemory());
+ registeredNodes.add(nodeId);
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ triggered = true;
+ return response;
+ }
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException {
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID++);
+
+ NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+ newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
+ null, null, null, 1000L);
+ return nhResponse;
}
}
@@ -843,8 +909,7 @@ public class TestNodeStatusUpdater {
final long connectionRetryIntervalSecs = 1;
//Waiting for rmStartIntervalMS, RM will be started
final long rmStartIntervalMS = 2*1000;
- YarnConfiguration conf = createNMConfig();
- conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
@@ -907,8 +972,6 @@ public class TestNodeStatusUpdater {
}
long duration = System.currentTimeMillis() - waitStartTime;
MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
- Assert.assertTrue("Updater was never started",
- myUpdater.getWaitStartTime()>0);
Assert.assertTrue("NM started before updater triggered",
myUpdater.isTriggered());
Assert.assertTrue("NM should have connected to RM after "
@@ -1037,13 +1100,13 @@ public class TestNodeStatusUpdater {
final long connectionWaitSecs = 1;
final long connectionRetryIntervalSecs = 1;
YarnConfiguration conf = createNMConfig();
- conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
connectionRetryIntervalSecs);
CyclicBarrier syncBarrier = new CyclicBarrier(2);
- nm = new MyNodeManager2(syncBarrier);
+ nm = new MyNodeManager2(syncBarrier, conf);
nm.init(conf);
nm.start();
try {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
Tue Jul 16 22:54:55 2013
@@ -118,6 +118,11 @@ public abstract class BaseContainerManag
};
@Override
+ protected void stopRMProxy() {
+ return;
+ }
+
+ @Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1503935&r1=1503934&r2=1503935&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Tue Jul 16 22:54:55 2013
@@ -390,6 +390,11 @@ public class MiniYARNCluster extends Com
}
};
};
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
};
};
}