Author: mc
Date: Mon Aug 29 10:43:36 2005
New Revision: 264176
URL: http://svn.apache.org/viewcvs?rev=264176&view=rev
Log:
Synchronize the publicly-available methods in these
classes. They're not high-contention ones, so we won't
suffer performance-wise.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=264176&r1=264175&r2=264176&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
Mon Aug 29 10:43:36 2005
@@ -329,7 +329,7 @@
/**
* Process incoming heartbeat messages from the task trackers.
*/
- public IntWritable emitHeartbeat(TaskTrackerStatus trackerStatus,
BooleanWritable initialContact) {
+ public synchronized IntWritable emitHeartbeat(TaskTrackerStatus
trackerStatus, BooleanWritable initialContact) {
String trackerName = trackerStatus.getTrackerName();
trackerStatus.setLastSeen(System.currentTimeMillis());
@@ -365,7 +365,7 @@
/**
* A tracker wants to know if there's a Task to run
*/
- public Task pollForNewTask(String trackerName) {
+ public synchronized Task pollForNewTask(String trackerName) {
//LOG.info("Unassigned tasks: " + unassignedTasks.size());
// Allocate a pending task to this TaskTracker
@@ -409,30 +409,34 @@
* map task outputs.
*
*/
- public MapOutputLocation[] locateMapOutputs(String taskId, String[]
mapTasksNeeded) {
+ public synchronized MapOutputLocation[] locateMapOutputs(String taskId,
String[] mapTasksNeeded) {
JobInProgress job = (JobInProgress) jobs.get((String)
taskToJobMap.get(taskId));
MapOutputLocation outputs[] = job.locateTasks(mapTasksNeeded);
return outputs;
}
- public JobTracker.JobInProgress getJob(String jobid) {
- return (JobInProgress) jobs.get(jobid);
+ /**
+ * Grab the local fs name
+ */
+ public synchronized String getFilesystemName() throws IOException {
+ return fs.getName();
}
+
////////////////////////////////////////////////////
// JobSubmissionProtocol
////////////////////////////////////////////////////
- public JobStatus submitJob(String jobFile) throws IOException {
+ public synchronized JobStatus submitJob(String jobFile) throws IOException
{
totalSubmissions++;
JobInProgress job = createJob(jobFile);
return job.getStatus();
}
- public void killJob(String jobid) {
+ public synchronized void killJob(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
job.kill();
}
- public JobProfile getJobProfile(String jobid) {
+ public synchronized JobProfile getJobProfile(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
if (job != null) {
return job.getProfile();
@@ -440,7 +444,7 @@
return null;
}
}
- public JobStatus getJobStatus(String jobid) {
+ public synchronized JobStatus getJobStatus(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
if (job != null) {
return job.getStatus();
@@ -448,7 +452,7 @@
return null;
}
}
- public Vector[] getMapTaskReport(String jobid) {
+ public synchronized Vector[] getMapTaskReport(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
if (job == null) {
return new Vector[0];
@@ -468,7 +472,7 @@
}
}
- public Vector[] getReduceTaskReport(String jobid) {
+ public synchronized Vector[] getReduceTaskReport(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
if (job == null) {
return new Vector[0];
@@ -488,6 +492,10 @@
}
}
+ //////////////////////////////////////////////////////////////
+ // (See InterTrackerProtocol section for getFilesystemName())
+ //////////////////////////////////////////////////////////////
+
Vector generateSingleReport(String taskid, TaskStatus status, Vector
diagInfo) {
Vector report = new Vector();
report.add(taskid);
@@ -496,13 +504,13 @@
return report;
}
- public String getFilesystemName() throws IOException {
- return fs.getName();
- }
-
///////////////////////////////////////////////////////////////
// JobTracker methods
///////////////////////////////////////////////////////////////
+ public JobTracker.JobInProgress getJob(String jobid) {
+ return (JobInProgress) jobs.get(jobid);
+ }
+
/**
* JobProfile createJob() kicks off a new job.
* This function creates a job profile and also decomposes it into
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264176&r1=264175&r2=264176&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Mon Aug 29 10:43:36 2005
@@ -491,7 +491,7 @@
/**
* Called upon startup by the child process, to fetch Task data.
*/
- public Task getTask(String taskid) throws IOException {
+ public synchronized Task getTask(String taskid) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
return (Task) tip.getTask();
@@ -503,7 +503,7 @@
/**
* Called periodically to report Task progress, from 0.0 to 1.0.
*/
- public void progress(String taskid, float progress, String state) throws
IOException {
+ public synchronized void progress(String taskid, float progress, String
state) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
tip.reportProgress(progress, state);
}
@@ -512,24 +512,24 @@
* Called when the task dies before completion, and we want to report back
* diagnostic info
*/
- public void reportDiagnosticInfo(String taskid, String info) throws
IOException {
+ public synchronized void reportDiagnosticInfo(String taskid, String info)
throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
tip.reportDiagnosticInfo(info);
}
+ /** Child checking to see if we're alive. Normally does nothing.*/
+ public synchronized void ping(String taskid) throws IOException {
+ if (tasks.get(taskid) == null) {
+ throw new IOException("No such task id."); // force child exit
+ }
+ }
+
/**
* The task is done.
*/
- public void done(String taskid) throws IOException {
+ public synchronized void done(String taskid) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
tip.reportDone();
- }
-
- /** Child checking to see if we're alive. Normally does nothing.*/
- public void ping(String taskid) throws IOException {
- if (tasks.get(taskid) == null) {
- throw new IOException("No such task id."); // force child exit
- }
}
/////////////////////////////////////////////////////