Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Aug 19 23:49:39 2014 @@ -126,6 +126,10 @@ public class YarnConfiguration extends C public static final String DEFAULT_RM_ADDRESS = "0.0.0.0:" + DEFAULT_RM_PORT; + /** The actual bind address for the RM.*/ + public static final String RM_BIND_HOST = + RM_PREFIX + "bind-host"; + /** The number of threads used to handle applications manager requests.*/ public static final String RM_CLIENT_THREAD_COUNT = RM_PREFIX + "client.thread-count"; @@ -263,6 +267,17 @@ public class YarnConfiguration extends C public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = RM_PREFIX + "webapp.spnego-keytab-file"; + /** + * Flag to enable override of the default kerberos authentication filter with + * the RM authentication filter to allow authentication using delegation + * tokens(fallback to kerberos if the tokens are missing). Only applicable + * when the http authentication type is kerberos. + */ + public static final String RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER = RM_PREFIX + + "webapp.delegation-token-auth-filter.enabled"; + public static final boolean DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER = + true; + /** How long to wait until a container is considered dead.*/ public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; @@ -318,17 +333,24 @@ public class YarnConfiguration extends C public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; + @Private + public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX + + "work-preserving-recovery.enabled"; + @Private + public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED = + false; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; public static final String RM_ZK_ADDRESS = RM_ZK_PREFIX + "address"; public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries"; - public static final int DEFAULT_ZK_RM_NUM_RETRIES = 500; + public static final int DEFAULT_ZK_RM_NUM_RETRIES = 1000; public static final String RM_ZK_RETRY_INTERVAL_MS = RM_ZK_PREFIX + "retry-interval-ms"; - public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 2000; + public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000; public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms"; public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000; @@ -527,6 +549,10 @@ public class YarnConfiguration extends C public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:" + DEFAULT_NM_PORT; + /** The actual bind address or the NM.*/ + public static final String NM_BIND_HOST = + NM_PREFIX + "bind-host"; + /** who will execute(launch) the containers.*/ public static final String NM_CONTAINER_EXECUTOR = NM_PREFIX + "container-executor.class"; @@ -908,7 +934,7 @@ public class YarnConfiguration extends C PROXY_PREFIX + "address"; public static final int DEFAULT_PROXY_PORT = 9099; public static final String DEFAULT_PROXY_ADDRESS = - "0.0.0.0:" + DEFAULT_RM_PORT; + "0.0.0.0:" + DEFAULT_PROXY_PORT; /** * YARN Service Level Authorization @@ -1105,7 +1131,7 @@ public class YarnConfiguration extends C /** The setting that controls whether timeline service is enabled or not. */ public static final String TIMELINE_SERVICE_ENABLED = TIMELINE_SERVICE_PREFIX + "enabled"; - public static final boolean DEFAULT_TIMELINE_SERVICE_ENABLED = true; + public static final boolean DEFAULT_TIMELINE_SERVICE_ENABLED = false; /** host:port address for timeline service RPC APIs. */ public static final String TIMELINE_SERVICE_ADDRESS = @@ -1114,6 +1140,10 @@ public class YarnConfiguration extends C public static final String DEFAULT_TIMELINE_SERVICE_ADDRESS = "0.0.0.0:" + DEFAULT_TIMELINE_SERVICE_PORT; + /** The listening endpoint for the timeline service application.*/ + public static final String TIMELINE_SERVICE_BIND_HOST = + TIMELINE_SERVICE_PREFIX + "bind-host"; + /** The number of threads to handle client RPC API requests. */ public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; @@ -1136,14 +1166,6 @@ public class YarnConfiguration extends C public static final String DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_PORT; - /**The kerberos principal to be used for spnego filter for timeline service.*/ - public static final String TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY = - TIMELINE_SERVICE_PREFIX + "webapp.spnego-principal"; - - /**The kerberos keytab to be used for spnego filter for timeline service.*/ - public static final String TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = - TIMELINE_SERVICE_PREFIX + "webapp.spnego-keytab-file"; - /** Timeline service store class */ public static final String TIMELINE_SERVICE_STORE = TIMELINE_SERVICE_PREFIX + "store-class"; @@ -1196,6 +1218,14 @@ public class YarnConfiguration extends C public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS = 1000 * 60 * 5; + /** The Kerberos principal for the timeline server.*/ + public static final String TIMELINE_SERVICE_PRINCIPAL = + TIMELINE_SERVICE_PREFIX + "principal"; + + /** The Kerberos keytab for the timeline server.*/ + public static final String TIMELINE_SERVICE_KEYTAB = + TIMELINE_SERVICE_PREFIX + "keytab"; + //////////////////////////////// // Other Configs //////////////////////////////// @@ -1340,7 +1370,7 @@ public class YarnConfiguration extends C public static String getClusterId(Configuration conf) { String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID); if (clusterId == null) { - throw new HadoopIllegalArgumentException("Configuration doesn't specify" + + throw new HadoopIllegalArgumentException("Configuration doesn't specify " + YarnConfiguration.RM_CLUSTER_ID); } return clusterId;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java Tue Aug 19 23:49:39 2014 @@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protoc /** * This exception is thrown when an ApplicationMaster asks for resources by - * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries - * to unregister by calling - * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} - * API without first registering by calling + * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} + * without first registering by calling * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} * or if it tries to register more than once. */ Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto Tue Aug 19 23:49:39 2014 @@ -127,11 +127,11 @@ message ApplicationAttemptStateDataProto optional string diagnostics = 6 [default = "N/A"]; optional int64 start_time = 7; optional FinalApplicationStatusProto final_application_status = 8; + optional int32 am_container_exit_status = 9 [default = -1000]; } -message RMStateVersionProto { - optional int32 major_version = 1; - optional int32 minor_version = 2; +message EpochProto { + optional int64 epoch = 1; } ////////////////////////////////////////////////////////////////// Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Tue Aug 19 23:49:39 2014 @@ -92,7 +92,7 @@ message ContainerReportProto { optional ResourceProto resource = 2; optional NodeIdProto node_id = 3; optional PriorityProto priority = 4; - optional int64 start_time = 5; + optional int64 creation_time = 5; optional int64 finish_time = 6; optional string diagnostics_info = 7 [default = "N/A"]; optional string log_url = 8; Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Aug 19 23:49:39 2014 @@ -85,6 +85,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; + optional hadoop.common.TokenProto am_rm_token = 12; } ////////////////////////////////////////////////////// Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml Tue Aug 19 23:49:39 2014 @@ -38,24 +38,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Aug 19 23:49:39 2014 @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -94,7 +95,6 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -208,7 +208,8 @@ public class ApplicationMaster { // App Master configuration // No. of containers to run shell command on - private int numTotalContainers = 1; + @VisibleForTesting + protected int numTotalContainers = 1; // Memory to request for the container on which the shell command will run private int containerMemory = 10; // VirtualCores to request for the container on which the shell command will run @@ -522,6 +523,8 @@ public class ApplicationMaster { + appAttemptID.toString(), e); } + // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class + // are marked as LimitedPrivate Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -592,8 +595,8 @@ public class ApplicationMaster { List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); - LOG.info("Received " + previousAMRunningContainers.size() - + " previous AM's running containers on AM registration."); + LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() + + " previous attempts' running containers on AM registration."); numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); int numTotalContainersToRequest = @@ -608,7 +611,7 @@ public class ApplicationMaster { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); } - numRequestedContainers.set(numTotalContainersToRequest); + numRequestedContainers.set(numTotalContainers); try { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END); @@ -687,7 +690,7 @@ public class ApplicationMaster { LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); for (ContainerStatus containerStatus : completedContainers) { - LOG.info("Got container status for containerID=" + LOG.info(appAttemptID + " got container status for containerID=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" @@ -900,11 +903,6 @@ public class ApplicationMaster { public void run() { LOG.info("Setting up container launch container for containerid=" + container.getId()); - ContainerLaunchContext ctx = Records - .newRecord(ContainerLaunchContext.class); - - // Set the environment - ctx.setEnvironment(shellEnv); // Set the local resources Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); @@ -935,16 +933,13 @@ public class ApplicationMaster { return; } - LocalResource shellRsrc = Records.newRecord(LocalResource.class); - shellRsrc.setType(LocalResourceType.FILE); - shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + URL yarnUrl = null; try { - shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI( - renamedScriptPath.toString()))); + yarnUrl = ConverterUtils.getYarnUrlFromURI( + new URI(renamedScriptPath.toString())); } catch (URISyntaxException e) { LOG.error("Error when trying to use shell script path specified" + " in env, path=" + renamedScriptPath, e); - // A failure scenario on bad input such as invalid shell script path // We know we cannot continue launching the container // so we should release it. @@ -953,13 +948,13 @@ public class ApplicationMaster { numFailedContainers.incrementAndGet(); return; } - shellRsrc.setTimestamp(shellScriptPathTimestamp); - shellRsrc.setSize(shellScriptPathLen); + LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + shellScriptPathLen, shellScriptPathTimestamp); localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath, shellRsrc); shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; } - ctx.setLocalResources(localResources); // Set the necessary command to execute on the allocated container Vector<CharSequence> vargs = new Vector<CharSequence>(5); @@ -986,16 +981,18 @@ public class ApplicationMaster { List<String> commands = new ArrayList<String>(); commands.add(command.toString()); - ctx.setCommands(commands); - // Set up tokens for the container too. Today, for normal shell commands, - // the container in distribute-shell doesn't need any tokens. We are - // populating them mainly for NodeManagers to be able to download any - // files in the distributed file-system. The tokens are otherwise also - // useful in cases, for e.g., when one is running a "hadoop dfs" command - // inside the distributed shell. - ctx.setTokens(allTokens.duplicate()); + // Set up ContainerLaunchContext, setting local resource, environment, + // command and token for constructor. + // Note for tokens: Set up tokens for the container too. Today, for normal + // shell commands, the container in distribute-shell doesn't need any + // tokens. We are populating them mainly for NodeManagers to be able to + // download anyfiles in the distributed file-system. The tokens are + // otherwise also useful in cases, for e.g., when one is running a + // "hadoop dfs" command inside the distributed shell. + ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( + localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } @@ -1024,15 +1021,13 @@ public class ApplicationMaster { // setup requirements for hosts // using * as any host will do for the distributed shell app // set the priority for the request - Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? - pri.setPriority(requestPriority); + Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(containerMemory); - capability.setVirtualCores(containerVirtualCores); + Resource capability = Resource.newInstance(containerMemory, + containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); @@ -1059,8 +1054,8 @@ public class ApplicationMaster { TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() - .toString()); + entity.addPrimaryFilter("user", + UserGroupInformation.getCurrentUser().getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); @@ -1076,8 +1071,8 @@ public class ApplicationMaster { TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() - .toString()); + entity.addPrimaryFilter("user", + UserGroupInformation.getCurrentUser().getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); @@ -1094,8 +1089,8 @@ public class ApplicationMaster { TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); - entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() - .toString()); + entity.addPrimaryFilter("user", + UserGroupInformation.getCurrentUser().getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Tue Aug 19 23:49:39 2014 @@ -456,9 +456,6 @@ public class Client { appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources @@ -508,8 +505,6 @@ public class Client { addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " ")); } - // Set local resource info into app master container launch context - amContainer.setLocalResources(localResources); // Set the necessary security tokens as needed //amContainer.setContainerTokens(containerToken); @@ -550,8 +545,6 @@ public class Client { env.put("CLASSPATH", classPathEnv.toString()); - amContainer.setEnvironment(env); - // Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); @@ -587,14 +580,15 @@ public class Client { LOG.info("Completed setting up app master command " + command.toString()); List<String> commands = new ArrayList<String>(); commands.add(command.toString()); - amContainer.setCommands(commands); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( + localResources, env, commands, null, null, null); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(amMemory); - capability.setVirtualCores(amVCores); + Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application @@ -603,6 +597,7 @@ public class Client { // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); if (tokenRenewer == null || tokenRenewer.length() == 0) { @@ -627,9 +622,8 @@ public class Client { appContext.setAMContainerSpec(amContainer); // Set the priority for the application master - Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? - pri.setPriority(amPriority); + Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java Tue Aug 19 23:49:39 2014 @@ -36,9 +36,11 @@ public class TestDSFailedAppMaster exten if (appAttemptID.getAttemptId() == 2) { // should reuse the earlier running container, so numAllocatedContainers // should be set to 1. And should ask no more containers, so - // numRequestedContainers should be set to 0. + // numRequestedContainers should be the same as numTotalContainers. + // The only container is the container requested by the AM in the first + // attempt. if (numAllocatedContainers.get() != 1 - || numRequestedContainers.get() != 0) { + || numRequestedContainers.get() != numTotalContainers) { LOG.info("NumAllocatedContainers is " + numAllocatedContainers.get() + " and NumRequestedContainers is " + numAllocatedContainers.get() + ".Application Master failed. exiting"); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Tue Aug 19 23:49:39 2014 @@ -26,13 +26,13 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import java.net.InetAddress; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -73,6 +73,7 @@ public class TestDistributedShell { conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); conf.set("yarn.log.dir", "target"); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); if (yarnCluster == null) { yarnCluster = new MiniYARNCluster( TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true); @@ -168,7 +169,9 @@ public class TestDistributedShell { yarnClient.init(new Configuration(yarnCluster.getConfig())); yarnClient.start(); String hostName = NetUtils.getHostname(); + boolean verified = false; + String errorMessage = ""; while(!verified) { List<ApplicationReport> apps = yarnClient.getApplications(); if (apps.size() == 0 ) { @@ -176,15 +179,22 @@ public class TestDistributedShell { continue; } ApplicationReport appReport = apps.get(0); - if (appReport.getHost().startsWith(hostName) - && appReport.getRpcPort() == -1) { + if(appReport.getHost().equals("N/A")) { + Thread.sleep(10); + continue; + } + errorMessage = + "Expected host name to start with '" + hostName + "', was '" + + appReport.getHost() + "'. Expected rpc port to be '-1', was '" + + appReport.getRpcPort() + "'."; + if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) { verified = true; } if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) { break; } } - Assert.assertTrue(verified); + Assert.assertTrue(errorMessage, verified); t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); @@ -211,6 +221,64 @@ public class TestDistributedShell { .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); } + /* + * NetUtils.getHostname() returns a string in the form "hostname/ip". + * Sometimes the hostname we get is the FQDN and sometimes the short name. In + * addition, on machines with multiple network interfaces, it runs any one of + * the ips. The function below compares the returns values for + * NetUtils.getHostname() accounting for the conditions mentioned. + */ + private boolean checkHostname(String appHostname) throws Exception { + + String hostname = NetUtils.getHostname(); + if (hostname.equals(appHostname)) { + return true; + } + + Assert.assertTrue("Unknown format for hostname " + appHostname, + appHostname.contains("/")); + Assert.assertTrue("Unknown format for hostname " + hostname, + hostname.contains("/")); + + String[] appHostnameParts = appHostname.split("/"); + String[] hostnameParts = hostname.split("/"); + + return (compareFQDNs(appHostnameParts[0], hostnameParts[0]) && checkIPs( + hostnameParts[0], hostnameParts[1], appHostnameParts[1])); + } + + private boolean compareFQDNs(String appHostname, String hostname) + throws Exception { + if (appHostname.equals(hostname)) { + return true; + } + String appFQDN = InetAddress.getByName(appHostname).getCanonicalHostName(); + String localFQDN = InetAddress.getByName(hostname).getCanonicalHostName(); + return appFQDN.equals(localFQDN); + } + + private boolean checkIPs(String hostname, String localIP, String appIP) + throws Exception { + + if (localIP.equals(appIP)) { + return true; + } + boolean appIPCheck = false; + boolean localIPCheck = false; + InetAddress[] addresses = InetAddress.getAllByName(hostname); + for (InetAddress ia : addresses) { + if (ia.getHostAddress().equals(appIP)) { + appIPCheck = true; + continue; + } + if (ia.getHostAddress().equals(localIP)) { + localIPCheck = true; + } + } + return (appIPCheck && localIPCheck); + + } + @Test(timeout=90000) public void testDSRestartWithPreviousRunningContainers() throws Exception { String[] args = { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Tue Aug 19 23:49:39 2014 @@ -50,24 +50,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - </exclusions> </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml Tue Aug 19 23:49:39 2014 @@ -35,24 +35,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> @@ -75,14 +57,6 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> - <dependency> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> @@ -123,6 +97,12 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.client.api import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @InterfaceAudience.Public @InterfaceStability.Stable public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends AbstractService { + private static final Log LOG = LogFactory.getLog(AMRMClient.class); /** * Create a new instance of AMRMClient. @@ -207,14 +211,21 @@ public abstract class AMRMClient<T exten /** * Request additional containers and receive new container allocations. - * Requests made via <code>addContainerRequest</code> are sent to the - * <code>ResourceManager</code>. New containers assigned to the master are - * retrieved. Status of completed containers and node health updates are - * also retrieved. - * This also doubles up as a heartbeat to the ResourceManager and must be - * made periodically. - * The call may not always return any new allocations of containers. - * App should not make concurrent allocate requests. May cause request loss. + * Requests made via <code>addContainerRequest</code> are sent to the + * <code>ResourceManager</code>. New containers assigned to the master are + * retrieved. Status of completed containers and node health updates are also + * retrieved. This also doubles up as a heartbeat to the ResourceManager and + * must be made periodically. The call may not always return any new + * allocations of containers. App should not make concurrent allocate + * requests. May cause request loss. + * + * <p> + * Note : If the user has not removed container requests that have already + * been satisfied, then the re-register may end up sending the entire + * container requests to the RM (including matched requests). Which would mean + * the RM could end up giving it a lot of new allocated containers. + * </p> + * * @param progressIndicator Indicates progress made by the master * @return the response of the allocate request * @throws YarnException @@ -329,4 +340,63 @@ public abstract class AMRMClient<T exten return nmTokenCache; } + /** + * Wait for <code>check</code> to return true for each 1000 ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int)} + * and {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check + */ + public void waitFor(Supplier<Boolean> check) throws InterruptedException { + waitFor(check, 1000); + } + + /** + * Wait for <code>check</code> to return true for each + * <code>checkEveryMillis</code> ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check user defined checker + * @param checkEveryMillis interval to call <code>check</code> + */ + public void waitFor(Supplier<Boolean> check, int checkEveryMillis) + throws InterruptedException { + waitFor(check, checkEveryMillis, 1); + } + + /** + * Wait for <code>check</code> to return true for each + * <code>checkEveryMillis</code> ms. In the main loop, this method will log + * the message "waiting in main loop" for each <code>logInterval</code> times + * iteration to confirm the thread is alive. + * @param check user defined checker + * @param checkEveryMillis interval to call <code>check</code> + * @param logInterval interval to log for each + */ + public void waitFor(Supplier<Boolean> check, int checkEveryMillis, + int logInterval) throws InterruptedException { + Preconditions.checkNotNull(check, "check should not be null"); + Preconditions.checkArgument(checkEveryMillis >= 0, + "checkEveryMillis should be positive value"); + Preconditions.checkArgument(logInterval >= 0, + "logInterval should be positive value"); + + int loggingCounter = logInterval; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("Check the condition for main loop."); + } + + boolean result = check.get(); + if (result) { + LOG.info("Exits the main loop."); + return; + } + if (--loggingCounter <= 0) { + LOG.info("Waiting in main loop."); + loggingCounter = logInterval; + } + + Thread.sleep(checkEveryMillis); + } while (true); + } + } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Tue Aug 19 23:49:39 2014 @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.client.api.async; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @@ -90,6 +94,7 @@ import com.google.common.annotations.Vis @Stable public abstract class AMRMClientAsync<T extends ContainerRequest> extends AbstractService { + private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); protected final AMRMClient<T> client; protected final CallbackHandler handler; @@ -189,6 +194,65 @@ extends AbstractService { */ public abstract int getClusterNodeCount(); + /** + * Wait for <code>check</code> to return true for each 1000 ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int)} + * and {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check + */ + public void waitFor(Supplier<Boolean> check) throws InterruptedException { + waitFor(check, 1000); + } + + /** + * Wait for <code>check</code> to return true for each + * <code>checkEveryMillis</code> ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check user defined checker + * @param checkEveryMillis interval to call <code>check</code> + */ + public void waitFor(Supplier<Boolean> check, int checkEveryMillis) + throws InterruptedException { + waitFor(check, checkEveryMillis, 1); + }; + + /** + * Wait for <code>check</code> to return true for each + * <code>checkEveryMillis</code> ms. In the main loop, this method will log + * the message "waiting in main loop" for each <code>logInterval</code> times + * iteration to confirm the thread is alive. + * @param check user defined checker + * @param checkEveryMillis interval to call <code>check</code> + * @param logInterval interval to log for each + */ + public void waitFor(Supplier<Boolean> check, int checkEveryMillis, + int logInterval) throws InterruptedException { + Preconditions.checkNotNull(check, "check should not be null"); + Preconditions.checkArgument(checkEveryMillis >= 0, + "checkEveryMillis should be positive value"); + Preconditions.checkArgument(logInterval >= 0, + "logInterval should be positive value"); + + int loggingCounter = logInterval; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("Check the condition for main loop."); + } + + boolean result = check.get(); + if (result) { + LOG.info("Exits the main loop."); + return; + } + if (--loggingCounter <= 0) { + LOG.info("Waiting in main loop."); + loggingCounter = logInterval; + } + + Thread.sleep(checkEveryMillis); + } while (true); + } + public interface CallbackHandler { /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java Tue Aug 19 23:49:39 2014 @@ -234,8 +234,7 @@ extends AMRMClientAsync<T> { while (true) { try { responseQueue.put(response); - if (response.getAMCommand() == AMCommand.AM_RESYNC - || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { return; } break; @@ -280,7 +279,6 @@ extends AMRMClientAsync<T> { if (response.getAMCommand() != null) { switch(response.getAMCommand()) { - case AM_RESYNC: case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Tue Aug 19 23:49:39 2014 @@ -39,7 +39,9 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -47,20 +49,25 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -77,10 +84,18 @@ public class AMRMClientImpl<T extends Co private int lastResponseId = 0; + protected String appHostName; + protected int appHostPort; + protected String appTrackingUrl; + protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; + // blacklistedNodes is required for keeping history of blacklisted nodes that + // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get + // current blacklisted nodes and send back to RM. + protected final Set<String> blacklistedNodes = new HashSet<String>(); protected final Set<String> blacklistAdditions = new HashSet<String>(); protected final Set<String> blacklistRemovals = new HashSet<String>(); @@ -150,6 +165,10 @@ public class AMRMClientImpl<T extends Co protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); protected final Set<ContainerId> release = new TreeSet<ContainerId>(); + // pendingRelease holds history or release requests.request is removed only if + // RM sends completedContainer. + // How it different from release? --> release is for per allocate() request. + protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); @@ -185,19 +204,27 @@ public class AMRMClientImpl<T extends Co public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + this.appHostName = appHostName; + this.appHostPort = appHostPort; + this.appTrackingUrl = appTrackingUrl; Preconditions.checkArgument(appHostName != null, "The host name should not be null"); Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" + " should be any integers larger than or equal to -1"); - // do this only once ??? + + return registerApplicationMaster(); + } + + private RegisterApplicationMasterResponse registerApplicationMaster() + throws YarnException, IOException { RegisterApplicationMasterRequest request = - RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort, - appTrackingUrl); + RegisterApplicationMasterRequest.newInstance(this.appHostName, + this.appHostPort, this.appTrackingUrl); RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request); - synchronized (this) { - if(!response.getNMTokensFromPreviousAttempts().isEmpty()) { + lastResponseId = 0; + if (!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } } @@ -249,6 +276,25 @@ public class AMRMClientImpl<T extends Co } allocateResponse = rmClient.allocate(allocateRequest); + if (isResyncCommand(allocateResponse)) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + synchronized (this) { + release.addAll(this.pendingRelease); + blacklistAdditions.addAll(this.blacklistedNodes); + for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable + .values()) { + for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) { + for (ResourceRequestInfo request : capabalities.values()) { + addResourceRequestToAsk(request.remoteRequest); + } + } + } + } + // re register with RM + registerApplicationMaster(); + return allocate(progressIndicator); + } synchronized (this) { // update these on successful RPC @@ -258,6 +304,14 @@ public class AMRMClientImpl<T extends Co if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } + if (!pendingRelease.isEmpty() + && !allocateResponse.getCompletedContainersStatuses().isEmpty()) { + removePendingReleaseRequests(allocateResponse + .getCompletedContainersStatuses()); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -288,6 +342,18 @@ public class AMRMClientImpl<T extends Co return allocateResponse; } + protected void removePendingReleaseRequests( + List<ContainerStatus> completedContainersStatuses) { + for (ContainerStatus containerStatus : completedContainersStatuses) { + pendingRelease.remove(containerStatus.getContainerId()); + } + } + + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + @Private @VisibleForTesting protected void populateNMTokens(List<NMToken> nmTokens) { @@ -324,6 +390,12 @@ public class AMRMClientImpl<T extends Co } catch (InterruptedException e) { LOG.info("Interrupted while waiting for application" + " to be removed from RMStateStore"); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + // re register with RM + registerApplicationMaster(); + unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); } } @@ -414,6 +486,7 @@ public class AMRMClientImpl<T extends Co public synchronized void releaseAssignedContainer(ContainerId containerId) { Preconditions.checkArgument(containerId != null, "ContainerId can not be null."); + pendingRelease.add(containerId); release.add(containerId); } @@ -655,6 +728,7 @@ public class AMRMClientImpl<T extends Co if (blacklistAdditions != null) { this.blacklistAdditions.addAll(blacklistAdditions); + this.blacklistedNodes.addAll(blacklistAdditions); // if some resources are also in blacklistRemovals updated before, we // should remove them here. this.blacklistRemovals.removeAll(blacklistAdditions); @@ -662,6 +736,7 @@ public class AMRMClientImpl<T extends Co if (blacklistRemovals != null) { this.blacklistRemovals.addAll(blacklistRemovals); + this.blacklistedNodes.removeAll(blacklistRemovals); // if some resources are in blacklistAdditions before, we should remove // them here. this.blacklistAdditions.removeAll(blacklistRemovals); @@ -675,4 +750,16 @@ public class AMRMClientImpl<T extends Co "blacklistRemovals in updateBlacklist."); } } + + private void updateAMRMToken(Token token) throws IOException { + org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = + new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token + .getIdentifier().array(), token.getPassword().array(), new Text( + token.getKind()), new Text(token.getService())); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + if (UserGroupInformation.isSecurityEnabled()) { + currentUGI = UserGroupInformation.getLoginUser(); + } + currentUGI.addToken(amrmToken); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java Tue Aug 19 23:49:39 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -29,8 +30,13 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; @@ -64,6 +70,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -74,6 +81,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -82,8 +90,10 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -97,8 +107,11 @@ public class YarnClientImpl extends Yarn protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; private long asyncApiPollTimeoutMillis; - protected AHSClient historyClient; + private AHSClient historyClient; private boolean historyServiceEnabled; + protected TimelineClient timelineClient; + protected Text timelineService; + protected boolean timelineServiceEnabled; private static final String ROOT = "root"; @@ -126,10 +139,17 @@ public class YarnClientImpl extends Yarn if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { historyServiceEnabled = true; - historyClient = AHSClientImpl.createAHSClient(); - historyClient.init(getConfig()); + historyClient = AHSClient.createAHSClient(); + historyClient.init(conf); } + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineServiceEnabled = true; + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineService = TimelineUtils.buildTimelineTokenService(conf); + } super.serviceInit(conf); } @@ -141,6 +161,9 @@ public class YarnClientImpl extends Yarn if (historyServiceEnabled) { historyClient.start(); } + if (timelineServiceEnabled) { + timelineClient.start(); + } } catch (IOException e) { throw new YarnRuntimeException(e); } @@ -155,6 +178,9 @@ public class YarnClientImpl extends Yarn if (historyServiceEnabled) { historyClient.stop(); } + if (timelineServiceEnabled) { + timelineClient.stop(); + } super.serviceStop(); } @@ -189,6 +215,12 @@ public class YarnClientImpl extends Yarn Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); + // Automatically add the timeline DT into the CLC + // Only when the security and the timeline service are both enabled + if (isSecurityEnabled() && timelineServiceEnabled) { + addTimelineDelegationToken(appContext.getAMContainerSpec()); + } + //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); @@ -238,6 +270,48 @@ public class YarnClientImpl extends Yarn return applicationId; } + private void addTimelineDelegationToken( + ContainerLaunchContext clc) throws YarnException, IOException { + org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier> timelineDelegationToken = + timelineClient.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName()); + if (timelineDelegationToken == null) { + return; + } + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = clc.getTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + // If the timeline delegation token is already in the CLC, no need to add + // one more + for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : credentials + .getAllTokens()) { + TokenIdentifier tokenIdentifier = token.decodeIdentifier(); + if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) { + return; + } + } + credentials.addToken(timelineService, timelineDelegationToken); + if (LOG.isDebugEnabled()) { + LOG.debug("Add timline delegation token into credentials: " + + timelineDelegationToken); + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setTokens(tokens); + } + + @Private + @VisibleForTesting + protected boolean isSecurityEnabled() { + return UserGroupInformation.isSecurityEnabled(); + } + @Override public void killApplication(ApplicationId applicationId) throws YarnException, IOException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java Tue Aug 19 23:49:39 2014 @@ -304,7 +304,7 @@ public class ApplicationCLI extends Yarn containerReportStr.print("\tContainer-Id : "); containerReportStr.println(containerReport.getContainerId()); containerReportStr.print("\tStart-Time : "); - containerReportStr.println(containerReport.getStartTime()); + containerReportStr.println(containerReport.getCreationTime()); containerReportStr.print("\tFinish-Time : "); containerReportStr.println(containerReport.getFinishTime()); containerReportStr.print("\tState : "); @@ -525,7 +525,7 @@ public class ApplicationCLI extends Yarn "Finish Time", "State", "Host", "LOG-URL"); for (ContainerReport containerReport : appsReport) { writer.printf(CONTAINER_PATTERN, containerReport.getContainerId(), - containerReport.getStartTime(), containerReport.getFinishTime(), + containerReport.getCreationTime(), containerReport.getFinishTime(), containerReport.getContainerState(), containerReport .getAssignedNode(), containerReport.getLogUrl()); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java Tue Aug 19 23:49:39 2014 @@ -28,7 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; @@ -267,6 +267,7 @@ public abstract class ProtocolHATestBase protected void startHACluster(int numOfNMs, boolean overrideClientRMService, boolean overrideRTS, boolean overrideApplicationMasterService) throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster = new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2, Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java Tue Aug 19 23:49:39 2014 @@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.client; import java.io.IOException; import java.util.ArrayList; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; @@ -54,11 +54,9 @@ public class TestApplicationMasterServic amClient = ClientRMProxy .createRMProxy(this.conf, ApplicationMasterProtocol.class); - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(attemptId); Token<AMRMTokenIdentifier> appToken = - new Token<AMRMTokenIdentifier>(id, this.cluster.getResourceManager() - .getRMContext().getAMRMTokenSecretManager()); + this.cluster.getResourceManager().getRMContext() + .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); appToken.setService(new Text("appToken service")); UserGroupInformation.setLoginUser(UserGroupInformation .createRemoteUser(UserGroupInformation.getCurrentUser() Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java Tue Aug 19 23:49:39 2014 @@ -305,7 +305,8 @@ public class TestRMAdminCLI { testError(new String[] { "-help", "-getGroups" }, "Usage: yarn rmadmin [-getGroups [username]]", dataErr, 0); testError(new String[] { "-help", "-transitionToActive" }, - "Usage: yarn rmadmin [-transitionToActive <serviceId>]", dataErr, 0); + "Usage: yarn rmadmin [-transitionToActive <serviceId>" + + " [--forceactive]]", dataErr, 0); testError(new String[] { "-help", "-transitionToStandby" }, "Usage: yarn rmadmin [-transitionToStandby <serviceId>]", dataErr, 0); testError(new String[] { "-help", "-getServiceState" }, @@ -332,9 +333,9 @@ public class TestRMAdminCLI { "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + - " [username]] [-help [cmd]] [-transitionToActive <serviceId>]" + - " [-transitionToStandby <serviceId>] [-failover [--forcefence]" + - " [--forceactive] <serviceId> <serviceId>] " + + " [username]] [-help [cmd]] [-transitionToActive <serviceId>" + + " [--forceactive]] [-transitionToStandby <serviceId>] [-failover" + + " [--forcefence] [--forceactive] <serviceId> <serviceId>] " + "[-getServiceState <serviceId>] [-checkHealth <serviceId>]")); } finally { System.setOut(oldOutPrintStream); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java Tue Aug 19 23:49:39 2014 @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; import org.junit.Assert; @@ -169,6 +173,7 @@ public class TestRMFailover extends Clie verifyConnections(); } + @SuppressWarnings("unchecked") @Test public void testAutomaticFailover() throws YarnException, InterruptedException, IOException { @@ -186,6 +191,25 @@ public class TestRMFailover extends Clie failover(); verifyConnections(); + + // Make the current Active handle an RMFatalEvent, + // so it transitions to standby. + ResourceManager rm = cluster.getResourceManager( + cluster.getActiveRMIndex()); + RMFatalEvent event = + new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, + "Fake RMFatalEvent"); + rm.getRMContext().getDispatcher().getEventHandler().handle(event); + int maxWaitingAttempts = 2000; + while (maxWaitingAttempts-- > 0 ) { + if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { + break; + } + Thread.sleep(1); + } + Assert.assertFalse("RM didn't transition to Standby ", + maxWaitingAttempts == 0); + verifyConnections(); } @Test Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java Tue Aug 19 23:49:39 2014 @@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.client; import java.io.IOException; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA ext // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null); + YarnVersionInfo.getVersion(), null, null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api.async.impl; +import com.google.common.base.Supplier; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; @@ -180,7 +181,7 @@ public class TestAMRMClientAsync { AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); when(client.allocate(anyFloat())).thenThrow(ex); - AMRMClientAsync<ContainerRequest> asyncClient = + AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); @@ -203,43 +204,35 @@ public class TestAMRMClientAsync { Assert.assertTrue(callbackHandler.callbackCount == 0); } - @Test//(timeout=10000) - public void testAMRMClientAsyncReboot() throws Exception { + @Test (timeout = 10000) + public void testAMRMClientAsyncShutDown() throws Exception { Configuration conf = new Configuration(); TestCallbackHandler callbackHandler = new TestCallbackHandler(); @SuppressWarnings("unchecked") AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); - - final AllocateResponse rebootResponse = createAllocateResponse( + + final AllocateResponse shutDownResponse = createAllocateResponse( new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null); - rebootResponse.setAMCommand(AMCommand.AM_RESYNC); - when(client.allocate(anyFloat())).thenReturn(rebootResponse); - - AMRMClientAsync<ContainerRequest> asyncClient = - AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); + when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + + AMRMClientAsync<ContainerRequest> asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); asyncClient.init(conf); asyncClient.start(); - - synchronized (callbackHandler.notifier) { - asyncClient.registerApplicationMaster("localhost", 1234, null); - while(callbackHandler.reboot == false) { - try { - callbackHandler.notifier.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - + + asyncClient.registerApplicationMaster("localhost", 1234, null); + + Thread.sleep(50); + + verify(client, times(1)).allocate(anyFloat()); asyncClient.stop(); - // stopping should have joined all threads and completed all callbacks - Assert.assertTrue(callbackHandler.callbackCount == 0); } - + @Test (timeout = 10000) - public void testAMRMClientAsyncShutDown() throws Exception { + public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception { Configuration conf = new Configuration(); - TestCallbackHandler callbackHandler = new TestCallbackHandler(); + final TestCallbackHandler callbackHandler = new TestCallbackHandler(); @SuppressWarnings("unchecked") AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); @@ -253,9 +246,19 @@ public class TestAMRMClientAsync { asyncClient.init(conf); asyncClient.start(); + Supplier<Boolean> checker = new Supplier<Boolean>() { + @Override + public Boolean get() { + return callbackHandler.reboot; + } + }; + asyncClient.registerApplicationMaster("localhost", 1234, null); + asyncClient.waitFor(checker); - Thread.sleep(50); + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); verify(client, times(1)).allocate(anyFloat()); asyncClient.stop(); @@ -295,6 +298,40 @@ public class TestAMRMClientAsync { } } + @Test (timeout = 5000) + public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor() + throws YarnException, IOException, InterruptedException { + Configuration conf = new Configuration(); + final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2(); + @SuppressWarnings("unchecked") + AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); + + List<ContainerStatus> completed = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + final AllocateResponse response = createAllocateResponse(completed, + new ArrayList<Container>(), null); + + when(client.allocate(anyFloat())).thenReturn(response); + + AMRMClientAsync<ContainerRequest> asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + callbackHandler.asynClient = asyncClient; + asyncClient.init(conf); + asyncClient.start(); + + Supplier<Boolean> checker = new Supplier<Boolean>() { + @Override + public Boolean get() { + return callbackHandler.notify; + } + }; + + asyncClient.registerApplicationMaster("localhost", 1234, null); + asyncClient.waitFor(checker); + Assert.assertTrue(checker.get()); + } + void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws InterruptedException, YarnException, IOException { Configuration conf = new Configuration(); @@ -375,7 +412,7 @@ public class TestAMRMClientAsync { private volatile List<ContainerStatus> completedContainers; private volatile List<Container> allocatedContainers; Exception savedException = null; - boolean reboot = false; + volatile boolean reboot = false; Object notifier = new Object(); int callbackCount = 0; @@ -465,7 +502,7 @@ public class TestAMRMClientAsync { @SuppressWarnings("rawtypes") AMRMClientAsync asynClient; boolean stop = true; - boolean notify = false; + volatile boolean notify = false; boolean throwOutException = false; @Override
