Author: cutting Date: Wed Jul 20 11:23:25 2005 New Revision: 219958 URL: http://svn.apache.org/viewcvs?rev=219958&view=rev Log: Make task child exit promptly when parent is gone.
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=219958&r1=219957&r2=219958&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java Wed Jul 20 11:23:25 2005 @@ -115,6 +115,8 @@ // Ignore for now } + public void ping(String taskid) throws IOException {} + public void done(String taskId) throws IOException { int taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=219958&r1=219957&r2=219958&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Wed Jul 20 11:23:25 2005 @@ -56,6 +56,14 @@ MapOutputLocation[] locs = jobClient.locateMapOutputs(task.getTaskId(), neededStrings); + if (locs.length == 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + continue; + } + LOG.info("Got "+locs.length+" map output locations."); // try each of these locations Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=219958&r1=219957&r2=219958&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Wed Jul 20 11:23:25 2005 @@ -489,6 +489,13 @@ tip.reportDone(); } + /** Child checking to see if we're alive. Normally does nothing.*/ + public void ping(String taskid) throws IOException { + if (tasks.get(taskid) == null) { + throw new IOException("No such task id."); // force child exit + } + } + ///////////////////////////////////////////////////// // Called by TaskTracker thread after task process ends ///////////////////////////////////////////////////// @@ -516,8 +523,11 @@ Task task = umbilical.getTask(taskid); JobConf job = new JobConf(task.getJobFile()); + + startPinging(umbilical, taskid); // start pinging parent + try { - task.run(job, umbilical); // run the task + task.run(job, umbilical); // run the task } catch (Throwable throwable) { LOG.log(Level.WARNING, "Failed to spawn child", throwable); // Report back any failures, for diagnostic purposes @@ -526,6 +536,29 @@ umbilical.reportDiagnosticInfo(taskid, baos.toString()); } umbilical.done(taskid); + } + + /** Periodically ping parent and exit when this fails.*/ + private static void startPinging(final TaskUmbilicalProtocol umbilical, + final String taskid) { + Thread thread = new Thread(new Runnable() { + public void run() { + while (true) { + try { + umbilical.ping(taskid); + } catch (Throwable t) { + LOG.warning("Parent died. Exiting "+taskid); + System.exit(1); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } + }, "Pinger for "+taskid); + thread.setDaemon(true); + thread.start(); } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java?rev=219958&r1=219957&r2=219958&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java Wed Jul 20 11:23:25 2005 @@ -39,6 +39,9 @@ */ void reportDiagnosticInfo(String taskid, String trace) throws IOException; + /** Periodically called by child to check if parent is still alive. */ + void ping(String taskid) throws IOException; + /** Report that the task is successfully completed. Failure is assumed if * the task process exits without calling this. */ void done(String taskid) throws IOException;