Hey Vinod, I svn up'd, and rebuilt. My application's task (container) now runs!
Unfortunately, my application master eventually gets killed by the NodeManager anyway, and I'm still not clear as to why. The AM is just running a loop, asking for a container, and executing a command in the container. It keeps doing this over and over again. After a few iterations, it gets killed with something like: 2011-09-21 10:42:40,869 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(402)) - Memory usage of ProcessTree 21666 for container-id container_1316626117280_0002_01_000001 : Virtual 2260938752 bytes, limit : 2147483648 bytes; Physical 77398016 bytes, limit -1 bytes 2011-09-21 10:42:40,869 WARN monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:isProcessTreeOverLimit(289)) - Process tree for container: container_1316626117280_0002_01_000001 has processes older than 1 iteration running over the configured limit. Limit=2147483648, current usage = 2260938752 2011-09-21 10:42:40,870 WARN monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(453)) - Container [pid=21666,containerID=container_1316626117280_0002_01_000001] is running beyond memory-limits. Current usage : 2260938752bytes. Limit : 2147483648bytes. Killing container. Dump of the process-tree for container_1316626117280_0002_01_000001 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 21669 21666 21666 21666 (java) 105 4 2152300544 18593 java -Xmx512M -cp ./package/* kafka.yarn.ApplicationMaster /home/criccomi/git/kafka-yarn/dist/kafka-streamer.tgz 2 1 1316626117280 com.linkedin.TODO 1 |- 21666 20570 21666 21666 (bash) 0 0 108638208 303 /bin/bash -c java -Xmx512M -cp './package/*' kafka.yarn.ApplicationMaster /home/criccomi/git/kafka-yarn/dist/kafka-streamer.tgz 2 1 1316626117280 com.linkedin.TODO 1 1>/tmp/logs/application_1316626117280_0002/container_1316626117280_0002_01_000001/stdout 2>/tmp/logs/application_1316626117280_0002/container_1316626117280_0002_01_000001/stderr 2011-09-21 10:42:40,870 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(463)) - Removed ProcessTree with root 21666 I don't think that my AM is leaking memory. Full code paste after the break 1. Do I need to release a container in my AM even if the AM receives it as a finished container in the resource request response? 2. Do I need to free any other resources after a resource request (e.g. ResourceRequest, AllocateRequest, etc)? Cheers, Chris def main(args: Array[String]) { // YARN will always give our ApplicationMaster // the first four parameters as: <package> <app id> <attempt id> <timestamp> val packagePath = args(0) val appId = args(1).toInt val attemptId = args(2).toInt val timestamp = args(3).toLong // these are our application master's parameters val streamerClass = args(4) val tasks = args(5).toInt // TODO log params here // start the application master helper val conf = new Configuration val applicationMasterHelper = new ApplicationMasterHelper(appId, attemptId, timestamp, conf) .registerWithResourceManager // start and manage the slaves val noReleases = List[ContainerId]() var runningContainers = 0 // keep going forever while (true) { val nonRunningTasks = tasks - runningContainers val response = applicationMasterHelper.sendResourceRequest(nonRunningTasks, noReleases) response.getAllocatedContainers.foreach(container => { new ContainerExecutor(packagePath, container) .addCommand("java -Xmx256M -cp './package/*' kafka.yarn.StreamingTask " + streamerClass + " " + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr").execute(conf) }) runningContainers += response.getAllocatedContainers.length runningContainers -= response.getCompletedContainersStatuses.length Thread.sleep(1000) } applicationMasterHelper.unregisterWithResourceManager("SUCCESS") } class ApplicationMasterHelper(iAppId: Int, iAppAttemptId: Int, lTimestamp: Long, conf: Configuration) { val rpc = YarnRPC.create(conf) val appId = Records.newRecord(classOf[ApplicationId]) val appAttemptId = Records.newRecord(classOf[ApplicationAttemptId]) val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)) val resourceManager = rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] var requestId = 0 appId.setClusterTimestamp(lTimestamp) appId.setId(iAppId) appAttemptId.setApplicationId(appId) appAttemptId.setAttemptId(iAppAttemptId) def registerWithResourceManager(): ApplicationMasterHelper = { val req = Records.newRecord(classOf[RegisterApplicationMasterRequest]) req.setApplicationAttemptId(appAttemptId) // TODO not sure why these are blank- This is how spark does it req.setHost("") req.setRpcPort(1) req.setTrackingUrl("") resourceManager.registerApplicationMaster(req) this } def unregisterWithResourceManager(state: String): ApplicationMasterHelper = { val finReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) finReq.setAppAttemptId(appAttemptId) finReq.setFinalState(state) resourceManager.finishApplicationMaster(finReq) this } def sendResourceRequest(containers: Int, release: List[ContainerId]): AMResponse = { // TODO will need to make this more flexible for hostname requests, etc val request = Records.newRecord(classOf[ResourceRequest]) val pri = Records.newRecord(classOf[Priority]) val capability = Records.newRecord(classOf[Resource]) val req = Records.newRecord(classOf[AllocateRequest]) request.setHostName("*") request.setNumContainers(containers) pri.setPriority(1) request.setPriority(pri) capability.setMemory(128) request.setCapability(capability) req.setResponseId(requestId) req.setApplicationAttemptId(appAttemptId) req.addAllAsks(Lists.newArrayList(request)) req.addAllReleases(release) requestId += 1 // TODO we might want to return a list of container executors here instead of AMResponses resourceManager.allocate(req).getAMResponse } } ________________________________________ From: Vinod Kumar Vavilapalli [vino...@hortonworks.com] Sent: Wednesday, September 21, 2011 10:08 AM To: mapreduce-dev@hadoop.apache.org Subject: Re: ApplicationMaster Memory Usage Yes, the process-dump clearly tells that this is MAPREDUCE-2998. +Vinod (With a smirk to see his container-memory-monitoring code in action) On Wed, Sep 21, 2011 at 10:26 PM, Arun C Murthy <a...@hortonworks.com> wrote: > I'll bet you are hitting MR-2998. > > From the changelog: > > MAPREDUCE-2998. Fixed a bug in TaskAttemptImpl which caused it to fork > bin/mapred too many times. Contributed by Vinod K V. > > Arun > > On Sep 21, 2011, at 9:52 AM, Chris Riccomini wrote: > > > Hey Guys, > > > > My ApplicationMaster is being killed by the NodeManager because of memory > consumption, and I don't understand why. I'm using -Xmx512M, and setting my > resource request to 2048. > > > > > > .addCommand("java -Xmx512M -cp './package/*' > kafka.yarn.ApplicationMaster " ... > > > > ... > > > > private var memory = 2048 > > > > resource.setMemory(memory) > > containerCtx.setResource(resource) > > containerCtx.setCommands(cmds.toList) > > containerCtx.setLocalResources(Collections.singletonMap("package", > packageResource)) > > appCtx.setApplicationId(appId) > > appCtx.setUser(user.getShortUserName) > > appCtx.setAMContainerSpec(containerCtx) > > request.setApplicationSubmissionContext(appCtx) > > applicationsManager.submitApplication(request) > > > > When this runs, I see (in my NodeManager's logs): > > > > > > 2011-09-21 09:35:19,112 INFO monitor.ContainersMonitorImpl > (ContainersMonitorImpl.java:run(402)) - Memory usage of ProcessTree 28134 > for container-id container_1316559026783_0003_01_000001 : Virtual 2260938752 > bytes, limit : 2147483648 bytes; Physical 71540736 bytes, limit -1 bytes > > 2011-09-21 09:35:19,112 WARN monitor.ContainersMonitorImpl > (ContainersMonitorImpl.java:isProcessTreeOverLimit(289)) - Process tree for > container: container_1316559026783_0003_01_000001 has processes older than 1 > iteration running over the configured limit. Limit=2147483648, current usage > = 2260938752 > > 2011-09-21 09:35:19,113 WARN monitor.ContainersMonitorImpl > (ContainersMonitorImpl.java:run(453)) - Container > [pid=28134,containerID=container_1316559026783_0003_01_000001] is running > beyond memory-limits. Current usage : 2260938752bytes. Limit : > 2147483648bytes. Killing container. > > Dump of the process-tree for container_1316559026783_0003_01_000001 : > > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > > |- 28134 25886 28134 28134 (bash) 0 0 108638208 303 /bin/bash -c > java -Xmx512M -cp './package/*' kafka.yarn.ApplicationMaster 3 1 > 1316559026783 com.linkedin.TODO 1 > 1>/tmp/logs/application_1316559026783_0003/container_1316559026783_0003_01_000001/stdout > 2>/tmp/logs/application_1316559026783_0003/container_1316559026783_0003_01_000001/stderr > > |- 28137 28134 28134 28134 (java) 92 3 2152300544 17163 java > -Xmx512M -cp ./package/* kafka.yarn.ApplicationMaster 3 1 1316559026783 > com.linkedin.TODO 1 > > > > 2011-09-21 09:35:19,113 INFO monitor.ContainersMonitorImpl > (ContainersMonitorImpl.java:run(463)) - Removed ProcessTree with root 28134 > > > > It appears that YARN is honoring my 2048 command, yet my process is > somehow taking 2260938752 bytes. I don't think that I'm using nearly that > much in permgen, and my heap is limited to 512. I don't have any JNI stuff > running (that I know of), so it's unclear to me what's going on here. The > only thing that I can think of is that Java's Runtime exec is forking, and > copying its entire JVM memory footprint for the fork. > > > > Has anyone seen this? Am I doing something dumb? > > > > Thanks! > > Chris > >