Author: cutting Date: Mon Aug 29 20:08:46 2005 New Revision: 264685 URL: http://svn.apache.org/viewcvs?rev=264685&view=rev Log: Synchronize things in TaskTracker.offerService() loop. Also remove boxing in the heartbeat RPC.
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java?rev=264685&r1=264684&r2=264685&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java Mon Aug 29 20:08:46 2005 @@ -35,7 +35,7 @@ * TaskTracker must also indicate whether this is the first interaction * (since state refresh) */ - IntWritable emitHeartbeat(TaskTrackerStatus status, BooleanWritable initialContact); + int emitHeartbeat(TaskTrackerStatus status, boolean initialContact); /** Called to get new tasks from from the job tracker for this tracker.*/ Task pollForNewTask(String trackerName); Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=264685&r1=264684&r2=264685&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java Mon Aug 29 20:08:46 2005 @@ -329,13 +329,13 @@ /** * Process incoming heartbeat messages from the task trackers. */ - public synchronized IntWritable emitHeartbeat(TaskTrackerStatus trackerStatus, BooleanWritable initialContact) { + public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) { String trackerName = trackerStatus.getTrackerName(); trackerStatus.setLastSeen(System.currentTimeMillis()); synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { - if (initialContact.get()) { + if (initialContact) { // If it's first contact, then clear out any state hanging around if (taskTrackers.get(trackerName) != null) { taskTrackers.remove(trackerName); @@ -344,14 +344,14 @@ } else { // If not first contact, there should be some record of the tracker if (taskTrackers.get(trackerName) == null) { - return new IntWritable(InterTrackerProtocol.UNKNOWN_TASKTRACKER); + return InterTrackerProtocol.UNKNOWN_TASKTRACKER; } } // Store latest state. If first contact, then save current // state in expiry queue taskTrackers.put(trackerName, trackerStatus); - if (initialContact.get()) { + if (initialContact) { trackerExpiryQueue.add(trackerStatus); } } @@ -359,7 +359,7 @@ updateTaskStatuses(trackerStatus); //LOG.info("Got heartbeat from "+trackerName); - return new IntWritable(InterTrackerProtocol.TRACKERS_OK); + return InterTrackerProtocol.TRACKERS_OK; } /** Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264685&r1=264684&r2=264685&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Mon Aug 29 20:08:46 2005 @@ -124,7 +124,7 @@ * within the same process space might be restarted, so everything must be * clean. */ - public void close() throws IOException { + public synchronized void close() throws IOException { // Kill running tasks Vector v = new Vector(); for (Iterator it = tasks.values().iterator(); it.hasNext(); ) { @@ -186,7 +186,7 @@ // Emit standard hearbeat message to check in with JobTracker // Vector taskReports = new Vector(); - synchronized (runningTasks) { + synchronized (this) { for (Iterator it = runningTasks.keySet().iterator(); it.hasNext(); ) { String taskid = (String) it.next(); TaskInProgress tip = (TaskInProgress) runningTasks.get(taskid); @@ -204,11 +204,11 @@ if (justStarted) { this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName()); } - - IntWritable resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), new BooleanWritable(justStarted)); + + int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted); justStarted = false; - - if (resultCode.get() == InterTrackerProtocol.UNKNOWN_TASKTRACKER) { + + if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) { return STALE_STATE; } @@ -219,8 +219,10 @@ Task t = jobClient.pollForNewTask(taskTrackerName); if (t != null) { TaskInProgress tip = new TaskInProgress(t); - tasks.put(t.getTaskId(), tip); - runningTasks.put(t.getTaskId(), tip); + synchronized (this) { + tasks.put(t.getTaskId(), tip); + runningTasks.put(t.getTaskId(), tip); + } tip.launchTask(); } } @@ -228,7 +230,7 @@ // // Kill any tasks that have not reported progress in the last X seconds. // - synchronized (runningTasks) { + synchronized (this) { for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); if ((tip.getRunState() == TaskStatus.RUNNING) && @@ -245,8 +247,10 @@ // String toCloseId = jobClient.pollForClosedTask(taskTrackerName); if (toCloseId != null) { + synchronized (this) { TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId); tip.cleanup(); + } } lastHeartbeat = now; } @@ -538,7 +542,7 @@ /** * The task is no longer running. It may not have completed successfully */ - void reportTaskFinished(String taskid) { + synchronized void reportTaskFinished(String taskid) { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); tip.taskFinished(); }