Author: mc
Date: Mon Aug 29 10:43:36 2005
New Revision: 264176

URL: http://svn.apache.org/viewcvs?rev=264176&view=rev
Log:

  Synchronize the publicly-available methods in these
classes.  They're not high-contention ones, so we won't
suffer performance-wise.


Modified:
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=264176&r1=264175&r2=264176&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java 
Mon Aug 29 10:43:36 2005
@@ -329,7 +329,7 @@
     /**
      * Process incoming heartbeat messages from the task trackers.
      */
-    public IntWritable emitHeartbeat(TaskTrackerStatus trackerStatus, 
BooleanWritable initialContact) {
+    public synchronized IntWritable emitHeartbeat(TaskTrackerStatus 
trackerStatus, BooleanWritable initialContact) {
         String trackerName = trackerStatus.getTrackerName();
         trackerStatus.setLastSeen(System.currentTimeMillis());
 
@@ -365,7 +365,7 @@
     /**
      * A tracker wants to know if there's a Task to run
      */
-    public Task pollForNewTask(String trackerName) {
+    public synchronized Task pollForNewTask(String trackerName) {
         //LOG.info("Unassigned tasks: " + unassignedTasks.size());
 
         // Allocate a pending task to this TaskTracker
@@ -409,30 +409,34 @@
      * map task outputs.
      * 
      */
-    public MapOutputLocation[] locateMapOutputs(String taskId, String[] 
mapTasksNeeded) {
+    public synchronized MapOutputLocation[] locateMapOutputs(String taskId, 
String[] mapTasksNeeded) {
         JobInProgress job = (JobInProgress) jobs.get((String) 
taskToJobMap.get(taskId));
         MapOutputLocation outputs[] = job.locateTasks(mapTasksNeeded);
         return outputs;
     }
 
-    public JobTracker.JobInProgress getJob(String jobid) {
-        return (JobInProgress) jobs.get(jobid);
+    /**
+     * Grab the local fs name
+     */
+    public synchronized String getFilesystemName() throws IOException {
+        return fs.getName();
     }
+
     ////////////////////////////////////////////////////
     // JobSubmissionProtocol
     ////////////////////////////////////////////////////
-    public JobStatus submitJob(String jobFile) throws IOException {
+    public synchronized JobStatus submitJob(String jobFile) throws IOException 
{
         totalSubmissions++;
         JobInProgress job = createJob(jobFile);
         return job.getStatus();
     }
 
-    public void killJob(String jobid) {
+    public synchronized void killJob(String jobid) {
         JobInProgress job = (JobInProgress) jobs.get(jobid);
         job.kill();
     }
 
-    public JobProfile getJobProfile(String jobid) {
+    public synchronized JobProfile getJobProfile(String jobid) {
         JobInProgress job = (JobInProgress) jobs.get(jobid);
         if (job != null) {
             return job.getProfile();
@@ -440,7 +444,7 @@
             return null;
         }
     }
-    public JobStatus getJobStatus(String jobid) {
+    public synchronized JobStatus getJobStatus(String jobid) {
         JobInProgress job = (JobInProgress) jobs.get(jobid);
         if (job != null) {
             return job.getStatus();
@@ -448,7 +452,7 @@
             return null;
         }
     }
-    public Vector[] getMapTaskReport(String jobid) {
+    public synchronized Vector[] getMapTaskReport(String jobid) {
         JobInProgress job = (JobInProgress) jobs.get(jobid);
         if (job == null) {
             return new Vector[0];
@@ -468,7 +472,7 @@
         }
     }
 
-    public Vector[] getReduceTaskReport(String jobid) {
+    public synchronized Vector[] getReduceTaskReport(String jobid) {
         JobInProgress job = (JobInProgress) jobs.get(jobid);
         if (job == null) {
             return new Vector[0];
@@ -488,6 +492,10 @@
         }
     }
 
+    //////////////////////////////////////////////////////////////
+    //  (See InterTrackerProtocol section for getFilesystemName())
+    //////////////////////////////////////////////////////////////
+
     Vector generateSingleReport(String taskid, TaskStatus status, Vector 
diagInfo) {
         Vector report = new Vector();
         report.add(taskid);
@@ -496,13 +504,13 @@
         return report;
     }
 
-    public String getFilesystemName() throws IOException {
-        return fs.getName();
-    }
-
     ///////////////////////////////////////////////////////////////
     // JobTracker methods
     ///////////////////////////////////////////////////////////////
+    public JobTracker.JobInProgress getJob(String jobid) {
+        return (JobInProgress) jobs.get(jobid);
+    }
+
     /**
      * JobProfile createJob() kicks off a new job.  
      * This function creates a job profile and also decomposes it into

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264176&r1=264175&r2=264176&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java 
Mon Aug 29 10:43:36 2005
@@ -491,7 +491,7 @@
     /**
      * Called upon startup by the child process, to fetch Task data.
      */
-    public Task getTask(String taskid) throws IOException {
+    public synchronized Task getTask(String taskid) throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         if (tip != null) {
             return (Task) tip.getTask();
@@ -503,7 +503,7 @@
     /**
      * Called periodically to report Task progress, from 0.0 to 1.0.
      */
-    public void progress(String taskid, float progress, String state) throws 
IOException {
+    public synchronized void progress(String taskid, float progress, String 
state) throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         tip.reportProgress(progress, state);
     }
@@ -512,24 +512,24 @@
      * Called when the task dies before completion, and we want to report back
      * diagnostic info
      */
-    public void reportDiagnosticInfo(String taskid, String info) throws 
IOException {
+    public synchronized void reportDiagnosticInfo(String taskid, String info) 
throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         tip.reportDiagnosticInfo(info);
     }
 
+    /** Child checking to see if we're alive.  Normally does nothing.*/
+    public synchronized void ping(String taskid) throws IOException {
+      if (tasks.get(taskid) == null) {
+        throw new IOException("No such task id."); // force child exit
+      }
+    }
+
     /**
      * The task is done.
      */
-    public void done(String taskid) throws IOException {
+    public synchronized void done(String taskid) throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         tip.reportDone();
-    }
-
-    /** Child checking to see if we're alive.  Normally does nothing.*/
-    public void ping(String taskid) throws IOException {
-      if (tasks.get(taskid) == null) {
-        throw new IOException("No such task id."); // force child exit
-      }
     }
 
     /////////////////////////////////////////////////////


Reply via email to