Added: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java?rev=374443&view=auto ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java Thu Feb 2 09:58:38 2006 @@ -0,0 +1,445 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.mapred; + +import org.apache.nutch.io.*; +import org.apache.nutch.fs.*; +import org.apache.nutch.ipc.*; +import org.apache.nutch.util.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.logging.*; + + +//////////////////////////////////////////////////////// +// TaskInProgress maintains all the info needed for a +// Task in the lifetime of its owning Job. A given Task +// might be speculatively executed or reexecuted, so we +// need a level of indirection above the running-id itself. +// +// A given TaskInProgress contains multiple taskids, +// 0 or more of which might be executing at any one time. +// (That's what allows speculative execution.) A taskid +// is now *never* recycled. A TIP allocates enough taskids +// to account for all the speculation and failures it will +// ever have to handle. Once those are up, the TIP is dead. +// +//////////////////////////////////////////////////////// +class TaskInProgress { + static final int MAX_TASK_EXECS = 10; + static final int MAX_TASK_FAILURES = 4; + static final double SPECULATIVE_GAP = 0.2; + static final long SPECULATIVE_LAG = 60 * 1000; + + public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.mapred.TaskInProgress"); + + // Defines the TIP + String jobFile = null; + FileSplit split = null; + TaskInProgress predecessors[] = null; + int partition; + JobTracker jobtracker; + String id; + String totalTaskIds[]; + JobInProgress job; + + // Status of the TIP + int numTaskFailures = 0; + double progress = 0; + long startTime = 0; + int completes = 0; + boolean failed = false; + TreeSet usableTaskIds = new TreeSet(); + TreeSet recentTasks = new TreeSet(); + NutchConf nutchConf; + + TreeMap taskDiagnosticData = new TreeMap(); + TreeMap taskStatuses = new TreeMap(); + + TreeSet machinesWhereFailed = new TreeSet(); + TreeSet tasksReportedClosed = new TreeSet(); + + /** + * Constructor for MapTask + */ + public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) { + this.jobFile = jobFile; + this.split = split; + this.jobtracker = jobtracker; + this.job = job; + this.nutchConf = nutchConf; + init(); + } + + /** + * Constructor for ReduceTask + */ + public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) { + this.jobFile = jobFile; + this.predecessors = predecessors; + this.partition = partition; + this.jobtracker = jobtracker; + this.job = job; + this.nutchConf = nutchConf; + init(); + } + + /** + * Initialization common to Map and Reduce + */ + void init() { + this.startTime = System.currentTimeMillis(); + this.id = "tip_" + jobtracker.createUniqueId(); + this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES]; + for (int i = 0; i < totalTaskIds.length; i++) { + if (isMapTask()) { + totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId(); + } else { + totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId(); + } + usableTaskIds.add(totalTaskIds[i]); + } + } + + //////////////////////////////////// + // Accessors, info, profiles, etc. + //////////////////////////////////// + + /** + * Return the parent job + */ + public JobInProgress getJob() { + return job; + } + /** + * Return an ID for this task, not its component taskid-threads + */ + public String getTIPId() { + return this.id; + } + /** + * Whether this is a map task + */ + public boolean isMapTask() { + return split != null; + } + /** + */ + public boolean isComplete() { + return (completes > 0); + } + /** + */ + public boolean isComplete(String taskid) { + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + if (status == null) { + return false; + } + return ((completes > 0) && (status.getRunState() == TaskStatus.SUCCEEDED)); + } + /** + */ + public boolean isFailed() { + return failed; + } + /** + * Number of times the TaskInProgress has failed. + */ + public int numTaskFailures() { + return numTaskFailures(); + } + /** + * Get the overall progress (from 0 to 1.0) for this TIP + */ + public double getProgress() { + return progress; + } + /** + * Returns whether a component task-thread should be + * closed because the containing JobInProgress has completed. + */ + public boolean shouldCloseForClosedJob(String taskid) { + // If the thing has never been closed, + // and it belongs to this TIP, + // and this TIP is somehow FINISHED, + // then true + TaskStatus ts = (TaskStatus) taskStatuses.get(taskid); + if ((ts != null) && + (! tasksReportedClosed.contains(taskid)) && + (job.getStatus().getRunState() != JobStatus.RUNNING)) { + tasksReportedClosed.add(taskid); + return true; + } else { + return false; + } + } + + /** + * A TaskInProgress might be speculatively executed, and so + * can have many taskids simultaneously. Reduce tasks rely on knowing + * their predecessor ids, so they can be sure that all the previous + * work has been completed. + * + * But we don't know ahead of time which task id will actually be + * the one that completes for a given Map task. We don't want the + * Reduce task to have to be recreated after Map-completion, or check + * in with the JobTracker. So instead, each TaskInProgress preallocates + * all the task-ids it could ever want to run simultaneously. Then the + * Reduce task can be told about all the ids task-ids for a given Map + * TaskInProgress. If any of the Map TIP's tasks complete, the Reduce + * task will know all is well, and can continue. + * + * Most of the time, only a small number of the possible task-ids will + * ever be used. + */ + public String[] getAllPossibleTaskIds() { + return totalTaskIds; + } + + /** + * Creates a "status report" for this task. Includes the + * task ID and overall status, plus reports for all the + * component task-threads that have ever been started. + */ + Vector generateSingleReport() { + Vector report = new Vector(); + report.add(getTIPId()); + report.add("" + progress); + + report.add(new Integer(taskDiagnosticData.size())); + for (Iterator it = taskDiagnosticData.keySet().iterator(); it.hasNext(); ) { + String taskid = (String) it.next(); + Vector taskData = (Vector) taskDiagnosticData.get(taskid); + + TaskStatus taskStatus = (TaskStatus) taskStatuses.get(taskid); + String taskStateString = taskStatus.getStateString(); + + report.add(taskData); + report.add(taskStateString); + } + return report; + } + + //////////////////////////////////////////////// + // Update methods, usually invoked by the owning + // job. + //////////////////////////////////////////////// + /** + * A status message from a client has arrived. + * It updates the status of a single component-thread-task, + * which might result in an overall TaskInProgress status update. + */ + public void updateStatus(TaskStatus status) { + String taskid = status.getTaskId(); + String diagInfo = status.getDiagnosticInfo(); + if (diagInfo != null && diagInfo.length() > 0) { + Vector diagHistory = (Vector) taskDiagnosticData.get(taskid); + if (diagHistory == null) { + diagHistory = new Vector(); + taskDiagnosticData.put(taskid, diagHistory); + } + diagHistory.add(diagInfo); + } + taskStatuses.put(taskid, status); + + // Recompute progress + recomputeProgress(); + } + + /** + * Indicate that one of the taskids in this TaskInProgress + * has failed. + */ + public void failedSubTask(String taskid, String trackerName) { + // + // Note the failure and its location + // + LOG.info("Task '" + taskid + "' has been lost."); + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + if (status != null) { + status.setRunState(TaskStatus.FAILED); + } + this.recentTasks.remove(taskid); + this.completes--; + + numTaskFailures++; + if (numTaskFailures >= MAX_TASK_FAILURES) { + LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times."); + kill(); + } + machinesWhereFailed.add(trackerName); + + // Ask JobTracker to forget about this task + jobtracker.removeTaskEntry(taskid); + + recomputeProgress(); + } + + /** + * Indicate that one of the taskids in this TaskInProgress + * has successfully completed! + */ + public void completed(String taskid) { + LOG.info("Task '" + taskid + "' has completed."); + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + status.setRunState(TaskStatus.SUCCEEDED); + recentTasks.remove(taskid); + + // + // Now that the TIP is complete, the other speculative + // subtasks will be closed when the owning tasktracker + // reports in and calls shouldClose() on this object. + // + + this.completes++; + recomputeProgress(); + } + + /** + * The TIP's been ordered kill()ed. + */ + public void kill() { + if (isComplete() || failed) { + return; + } + this.failed = true; + recomputeProgress(); + } + + /** + * This method is called whenever there's a status change + * for one of the TIP's sub-tasks. It recomputes the overall + * progress for the TIP. We examine all sub-tasks and find + * the one that's most advanced (and non-failed). + */ + void recomputeProgress() { + if (isComplete()) { + this.progress = 1; + } else if (failed) { + this.progress = 0; + } else { + double bestProgress = 0; + for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) { + String taskid = (String) it.next(); + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + if (status.getRunState() == TaskStatus.SUCCEEDED) { + bestProgress = 1; + break; + } else if (status.getRunState() == TaskStatus.RUNNING) { + bestProgress = Math.max(bestProgress, status.getProgress()); + } + } + this.progress = bestProgress; + } + } + + ///////////////////////////////////////////////// + // "Action" methods that actually require the TIP + // to do something. + ///////////////////////////////////////////////// + /** + * Return whether this TIP has an NDFS cache-driven task + * to run at the given taskTracker. + */ + boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) { + if (failed || isComplete() || recentTasks.size() > 0) { + return false; + } else { + try { + if (isMapTask()) { + NutchFileSystem fs = NutchFileSystem.get(nutchConf); + String hints[][] = fs.getFileCacheHints(split.getFile(), split.getStart(), split.getLength()); + for (int i = 0; i < hints.length; i++) { + for (int j = 0; j < hints[i].length; j++) { + if (hints[i][j].equals(tts.getHost())) { + return true; + } + } + } + } + } catch (IOException ie) { + } + return false; + } + } + /** + * Return whether this TIP has a non-speculative task to run + */ + boolean hasTask() { + if (failed || isComplete() || recentTasks.size() > 0) { + return false; + } else { + for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) { + TaskStatus ts = (TaskStatus) it.next(); + if (ts.getRunState() == TaskStatus.RUNNING) { + return false; + } + } + return true; + } + } + /** + * Return whether the TIP has a speculative task to run. We + * only launch a speculative task if the current TIP is really + * far behind, and has been behind for a non-trivial amount of + * time. + */ + boolean hasSpeculativeTask(double averageProgress) { + // + // REMIND - mjc - these constants should be examined + // in more depth eventually... + // + if (isMapTask() && + (averageProgress - progress >= SPECULATIVE_GAP) && + (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { + return true; + } + return false; + } + + /** + * Return a Task that can be sent to a TaskTracker for execution. + */ + public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) { + Task t = null; + if (hasTaskWithCacheHit(taskTracker, tts) || + hasTask() || + hasSpeculativeTask(avgProgress)) { + + String taskid = (String) usableTaskIds.first(); + usableTaskIds.remove(taskid); + + if (isMapTask()) { + t = new MapTask(jobFile, taskid, split); + } else { + String mapIdPredecessors[][] = new String[predecessors.length][]; + for (int i = 0; i < mapIdPredecessors.length; i++) { + mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds(); + } + t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition); + } + t.setConf(nutchConf); + + recentTasks.add(taskid); + + // Ask JobTracker to note that the task exists + jobtracker.createTaskEntry(taskid, taskTracker, this); + } + return t; + } +}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java Thu Feb 2 09:58:38 2006 @@ -33,6 +33,7 @@ public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.mapred.TaskRunner"); + boolean killed = false; private Process process; private Task t; private TaskTracker tracker; @@ -51,7 +52,7 @@ /** Called to assemble this task's input. This method is run in the parent * process before the child is spawned. It should not execute user code, * only system code. */ - public void prepare() throws IOException {} + public boolean prepare() throws IOException {return true;} /** Called when this task's output is no longer needed. * This method is run in the parent process after the child exits. It should @@ -62,7 +63,9 @@ public final void run() { try { - prepare(); + if (! prepare()) { + return; + } String sep = System.getProperty("path.separator"); File workDir = new File(new File(t.getJobFile()).getParent(), "work"); @@ -72,7 +75,7 @@ // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); - + JobConf job = new JobConf(t.getJobFile()); String jar = job.getJar(); if (jar != null) { // if jar exists, it into workDir @@ -158,6 +161,7 @@ if (process != null) { process.destroy(); } + killed = true; } /** Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java Thu Feb 2 09:58:38 2006 @@ -34,6 +34,7 @@ public static final int UNASSIGNED = 3; private String taskid; + private boolean isMap; private float progress; private int runState; private String diagnosticInfo; @@ -41,8 +42,9 @@ public TaskStatus() {} - public TaskStatus(String taskid, float progress, int runState, String diagnosticInfo, String stateString) { + public TaskStatus(String taskid, boolean isMap, float progress, int runState, String diagnosticInfo, String stateString) { this.taskid = taskid; + this.isMap = isMap; this.progress = progress; this.runState = runState; this.diagnosticInfo = diagnosticInfo; @@ -50,6 +52,7 @@ } public String getTaskId() { return taskid; } + public boolean getIsMap() { return isMap; } public float getProgress() { return progress; } public void setProgress(float progress) { this.progress = progress; } public int getRunState() { return runState; } @@ -64,6 +67,7 @@ ////////////////////////////////////////////// public void write(DataOutput out) throws IOException { UTF8.writeString(out, taskid); + out.writeBoolean(isMap); out.writeFloat(progress); out.writeInt(runState); UTF8.writeString(out, diagnosticInfo); @@ -72,6 +76,7 @@ public void readFields(DataInput in) throws IOException { this.taskid = UTF8.readString(in); + this.isMap = in.readBoolean(); this.progress = in.readFloat(); this.runState = in.readInt(); this.diagnosticInfo = UTF8.readString(in); Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java Thu Feb 2 09:58:38 2006 @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nutch.mapred; + package org.apache.nutch.mapred; import org.apache.nutch.fs.*; import org.apache.nutch.io.*; @@ -33,7 +33,6 @@ * @author Mike Cafarella *******************************************************/ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable { - private int maxCurrentTask; static final long WAIT_FOR_DONE = 3 * 1000; private long taskTimeout; @@ -66,6 +65,8 @@ private NutchConf fConf; private MapOutputFile mapOutputFile; + private int maxCurrentTasks; + /** * Start with the local machine name, and the default JobTracker */ @@ -77,9 +78,10 @@ * Start with the local machine name, and the addr of the target JobTracker */ public TaskTracker(InetSocketAddress jobTrackAddr, NutchConf conf) throws IOException { + maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2); + this.fConf = conf; this.jobTrackAddr = jobTrackAddr; - this.maxCurrentTask = conf.getInt("mapred.tasktracker.tasks.maximum", 2); this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); @@ -93,6 +95,7 @@ */ void initialize() throws IOException { this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000); + LOG.info("Starting tracker " + taskTrackerName); this.localHostname = InetAddress.getLocalHost().getHostName(); new JobConf(this.fConf).deleteLocalFiles(SUBDIR); @@ -108,7 +111,7 @@ // RPC initialization while (true) { try { - this.taskReportServer = RPC.getServer(this, this.taskReportPort, this.maxCurrentTask, false, this.fConf); + this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf); this.taskReportServer.start(); break; } catch (BindException e) { @@ -119,7 +122,7 @@ } while (true) { try { - this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, this.maxCurrentTask, false, this.fConf); + this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks, false, this.fConf); this.mapOutputServer.start(); break; } catch (BindException e) { @@ -142,9 +145,14 @@ * clean. */ public synchronized void close() throws IOException { - // Kill running tasks - while (tasks.size() > 0) { - TaskInProgress tip = (TaskInProgress)tasks.get(tasks.firstKey()); + // + // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose', + // because calling jobHasFinished() may result in an edit to 'tasks'. + // + TreeMap tasksToClose = new TreeMap(); + tasksToClose.putAll(tasks); + for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) { + TaskInProgress tip = (TaskInProgress) it.next(); tip.jobHasFinished(); } @@ -154,11 +162,21 @@ } catch (InterruptedException ie) { } - // Shutdown local RPC servers - if (taskReportServer != null) { - taskReportServer.stop(); - taskReportServer = null; - } + // + // Shutdown local RPC servers. Do them + // in parallel, as RPC servers can take a long + // time to shutdown. (They need to wait a full + // RPC timeout, which might be 10-30 seconds.) + // + new Thread() { + public void run() { + if (taskReportServer != null) { + taskReportServer.stop(); + taskReportServer = null; + } + } + }.start(); + if (mapOutputServer != null) { mapOutputServer.stop(); mapOutputServer = null; @@ -227,7 +245,7 @@ // // Check if we should create a new Task // - if (runningTasks.size() < this.maxCurrentTask) { + if (runningTasks.size() < maxCurrentTasks) { Task t = jobClient.pollForNewTask(taskTrackerName); if (t != null) { TaskInProgress tip = new TaskInProgress(t, this.fConf); @@ -255,9 +273,16 @@ } // + // Check for any Tasks that should be killed, even if + // the containing Job is still ongoing. (This happens + // with speculative execution, when one version of the + // task finished before another + // + + // // Check for any Tasks whose job may have ended // - String toCloseId = jobClient.pollForClosedTask(taskTrackerName); + String toCloseId = jobClient.pollForTaskWithClosedJob(taskTrackerName); if (toCloseId != null) { synchronized (this) { TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId); @@ -288,7 +313,8 @@ staleState = true; } } catch (Exception ex) { - LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. ex=" + ex + " Retrying..."); + ex.printStackTrace(); + LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying..."); try { Thread.sleep(5000); } catch (InterruptedException ie) { @@ -373,7 +399,7 @@ /** */ public TaskStatus createStatus() { - TaskStatus status = new TaskStatus(task.getTaskId(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString); + TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString); if (diagnosticInfo.length() > 0) { diagnosticInfo = new StringBuffer(); } Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java Thu Feb 2 09:58:38 2006 @@ -77,6 +77,28 @@ public Iterator taskReports() { return taskReports.iterator(); } + + /** + * Return the current MapTask count + */ + public int countMapTasks() { + int mapCount = 0; + for (Iterator it = taskReports.iterator(); it.hasNext(); ) { + TaskStatus ts = (TaskStatus) it.next(); + if (ts.getIsMap()) { + mapCount++; + } + } + return mapCount; + } + + /** + * Return the current ReduceTask count + */ + public int countReduceTasks() { + return taskReports.size() - countMapTasks(); + } + /** */ public long getLastSeen() { Added: lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java?rev=374443&view=auto ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java (added) +++ lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java Thu Feb 2 09:58:38 2006 @@ -0,0 +1,317 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.mapred; + +import org.apache.nutch.fs.*; +import org.apache.nutch.io.*; +import org.apache.nutch.util.*; +import org.apache.nutch.mapred.lib.*; + +import java.io.*; +import java.util.*; +import java.math.*; + +/********************************************************** + * MapredLoadTest generates a bunch of work that exercises + * a Nutch Map-Reduce system (and NDFS, too). It goes through + * the following steps: + * + * 1) Take inputs 'range' and 'counts'. + * 2) Generate 'counts' random integers between 0 and range-1. + * 3) Create a file that lists each integer between 0 and range-1, + * and lists the number of times that integer was generated. + * 4) Emit a (very large) file that contains all the integers + * in the order generated. + * 5) After the file has been generated, read it back and count + * how many times each int was generated. + * 6) Compare this big count-map against the original one. If + * they match, then SUCCESS! Otherwise, FAILURE! + * + * OK, that's how we can think about it. What are the map-reduce + * steps that get the job done? + * + * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. + * 2) In a non-mapread thread, generate the answer-key and write to disk. + * 3) In a mapred job, divide the answer key into K jobs. + * 4) A mapred 'generator' task consists of K map jobs. Each reads + * an individual "sub-key", and generates integers according to + * to it (though with a random ordering). + * 5) The generator's reduce task agglomerates all of those files + * into a single one. + * 6) A mapred 'reader' task consists of M map jobs. The output + * file is cut into M pieces. Each of the M jobs counts the + * individual ints in its chunk and creates a map of all seen ints. + * 7) A mapred job integrates all the count files into a single one. + * + **********************************************************/ +public class MapredLoadTest { + static class RandomGenMapper implements Mapper { + Random r = new Random(); + public void configure(JobConf job) { + } + + public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { + int randomVal = ((IntWritable) key).get(); + int randomCount = ((IntWritable) val).get(); + + for (int i = 0; i < randomCount; i++) { + out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal)); + } + } + } + static class RandomGenReducer implements Reducer { + public void configure(JobConf job) { + } + + public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { + int keyint = ((IntWritable) key).get(); + while (it.hasNext()) { + int val = ((IntWritable) it.next()).get(); + out.collect(new UTF8("" + val), new UTF8("")); + } + } + } + static class RandomCheckMapper implements Mapper { + public void configure(JobConf job) { + } + + public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { + long pos = ((LongWritable) key).get(); + UTF8 str = (UTF8) val; + + out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1)); + } + } + static class RandomCheckReducer implements Reducer { + public void configure(JobConf job) { + } + + public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { + int keyint = ((IntWritable) key).get(); + int count = 0; + while (it.hasNext()) { + it.next(); + count++; + } + out.collect(new IntWritable(keyint), new IntWritable(count)); + } + } + + int range; + int counts; + Random r = new Random(); + NutchConf nutchConf; + + /** + * MapredLoadTest + */ + public MapredLoadTest(int range, int counts, NutchConf nutchConf) throws IOException { + this.range = range; + this.counts = counts; + this.nutchConf = nutchConf; + } + + /** + * + */ + public void launch() throws IOException { + // + // Generate distribution of ints. This is the answer key. + // + int countsToGo = counts; + int dist[] = new int[range]; + for (int i = 0; i < range; i++) { + double avgInts = (1.0 * countsToGo) / (range - i); + dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian()))); + countsToGo -= dist[i]; + } + if (countsToGo > 0) { + dist[dist.length-1] += countsToGo; + } + + // + // Write the answer key to a file. + // + NutchFileSystem fs = NutchFileSystem.get(nutchConf); + File testdir = new File("mapred.loadtest"); + fs.mkdirs(testdir); + + File randomIns = new File(testdir, "genins"); + fs.mkdirs(randomIns); + + File answerkey = new File(randomIns, "answer.key"); + SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), IntWritable.class, IntWritable.class); + try { + for (int i = 0; i < range; i++) { + out.append(new IntWritable(i), new IntWritable(dist[i])); + } + } finally { + out.close(); + } + + // + // Now we need to generate the random numbers according to + // the above distribution. + // + // We create a lot of map tasks, each of which takes at least + // one "line" of the distribution. (That is, a certain number + // X is to be generated Y number of times.) + // + // A map task emits Y key/val pairs. The val is X. The key + // is a randomly-generated number. + // + // The reduce task gets its input sorted by key. That is, sorted + // in random order. It then emits a single line of text that + // for the given values. It does not emit the key. + // + // Because there's just one reduce task, we emit a single big + // file of random numbers. + // + File randomOuts = new File(testdir, "genouts"); + fs.mkdirs(randomOuts); + + + JobConf genJob = new JobConf(nutchConf); + genJob.setInputDir(randomIns); + genJob.setInputKeyClass(IntWritable.class); + genJob.setInputValueClass(IntWritable.class); + genJob.setInputFormat(SequenceFileInputFormat.class); + genJob.setMapperClass(RandomGenMapper.class); + + genJob.setOutputDir(randomOuts); + genJob.setOutputKeyClass(IntWritable.class); + genJob.setOutputValueClass(IntWritable.class); + genJob.setOutputFormat(TextOutputFormat.class); + genJob.setReducerClass(RandomGenReducer.class); + genJob.setNumReduceTasks(1); + + JobClient.runJob(genJob); + + // + // Next, we read the big file in and regenerate the + // original map. + // + // We have many map tasks, each of which read at least one + // of the output numbers. For each number read in, the + // map task emits a key/value pair where the key is the + // number and the value is "1". + // + // We have a single reduce task, which receives its input + // sorted by the key emitted above. For each key, there will + // be a certain number of "1" values. The reduce task sums + // these values to compute how many times the given key was + // emitted. + // + // The reduce task then emits a key/val pair where the key + // is the number in question, and the value is the number of + // times the key was emitted. This is the same format as the + // original answer key (except that numbers emitted zero times + // will not appear in the regenerated key.) + // + File finalOuts = new File(testdir, "finalouts"); + fs.mkdirs(finalOuts); + JobConf checkJob = new JobConf(nutchConf); + checkJob.setInputDir(randomOuts); + checkJob.setInputKeyClass(LongWritable.class); + checkJob.setInputValueClass(UTF8.class); + checkJob.setInputFormat(TextInputFormat.class); + checkJob.setMapperClass(RandomCheckMapper.class); + + checkJob.setOutputDir(finalOuts); + checkJob.setOutputKeyClass(IntWritable.class); + checkJob.setOutputValueClass(IntWritable.class); + checkJob.setOutputFormat(SequenceFileOutputFormat.class); + checkJob.setReducerClass(RandomCheckReducer.class); + checkJob.setNumReduceTasks(1); + + JobClient.runJob(checkJob); + + // + // Finally, we compare the reconstructed answer key with the + // original one. Remember, we need to ignore zero-count items + // in the original key. + // + boolean success = true; + File recomputedkey = new File(finalOuts, "part-00000"); + SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey.getPath(), nutchConf); + int totalseen = 0; + try { + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + for (int i = 0; i < range; i++) { + if (dist[i] == 0) { + continue; + } + if (! in.next(key, val)) { + System.err.println("Cannot read entry " + i); + success = false; + break; + } else { + if ( !((key.get() == i ) && (val.get() == dist[i]))) { + System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]); + success = false; + } + totalseen += val.get(); + } + } + if (success) { + if (in.next(key, val)) { + System.err.println("Unnecessary lines in recomputed key!"); + success = false; + } + } + } finally { + in.close(); + } + int originalTotal = 0; + for (int i = 0; i < dist.length; i++) { + originalTotal += dist[i]; + } + System.out.println("Original sum: " + originalTotal); + System.out.println("Recomputed sum: " + totalseen); + + // + // Write to "results" whether the test succeeded or not. + // + File resultFile = new File(testdir, "results"); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile))); + try { + bw.write("Success=" + success + "\n"); + System.out.println("Success=" + success); + } finally { + bw.close(); + } + } + + /** + * Launches all the tasks in order. + */ + public static void main(String[] argv) throws Exception { + if (argv.length < 2) { + System.err.println("Usage: MapredLoadTest <range> <counts>"); + System.err.println(); + System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>"); + return; + } + + int i = 0; + int range = Integer.parseInt(argv[i++]); + int counts = Integer.parseInt(argv[i++]); + + MapredLoadTest mlt = new MapredLoadTest(range, counts, new NutchConf()); + mlt.launch(); + } +} Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp (original) +++ lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp Thu Feb 2 09:58:38 2006 @@ -9,21 +9,28 @@ <% String jobid = request.getParameter("jobid"); JobTracker tracker = JobTracker.getTracker(); - JobTracker.JobInProgress job = (JobTracker.JobInProgress) tracker.getJob(jobid); + JobInProgress job = (JobInProgress) tracker.getJob(jobid); JobProfile profile = (job != null) ? (job.getProfile()) : null; JobStatus status = (job != null) ? (job.getStatus()) : null; - Vector mapTaskReports[] = tracker.getMapTaskReport(jobid); - Vector reduceTaskReports[] = tracker.getReduceTaskReport(jobid); + Vector mapTaskReports[] = (job != null) ? tracker.getMapTaskReport(jobid) : null; + Vector reduceTaskReports[] = (job != null) ? tracker.getReduceTaskReport(jobid) : null; %> <html> <title>Nutch MapReduce Job Details</title> <body> +<% + if (job == null) { + %> + No job found<br> + <% + } else { + %> <h1>Job '<%=jobid%>'</h1> <b>Job File:</b> <%=profile.getJobFile()%><br> -<b>Start time:</b> <%= new Date(job.getStartTime())%><br> +<b>The job started at:</b> <%= new Date(job.getStartTime())%><br> <% if (status.getRunState() == JobStatus.RUNNING) { out.print("The job is still running.<br>\n"); @@ -38,19 +45,28 @@ <h2>Map Tasks</h2> <center> <table border=2 cellpadding="5" cellspacing="2"> - <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr> + <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr> <% for (int i = 0; i < mapTaskReports.length; i++) { Vector v = mapTaskReports[i]; - out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>"); - if (v.size() == 3) { - out.print("<td></td>"); - } else { - for (int j = 3; j < v.size(); j++) { - out.print("<td>" + v.elementAt(j) + "</td>"); + String tipid = (String) v.elementAt(0); + String progress = (String) v.elementAt(1); + int diagnosticSize = ((Integer) v.elementAt(2)).intValue(); + + out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>"); + for (int j = 0; j < diagnosticSize; j++) { + Vector taskData = (Vector) v.elementAt(3 + ((2 * j))); + String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1)); + out.print(taskStateString); + out.print("<b>"); + + for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) { + out.print("" + it2.next()); + out.println("<b>"); } } + out.print("</td>"); out.print("</tr>\n"); } %> @@ -62,25 +78,36 @@ <h2>Reduce Tasks</h2> <center> <table border=2 cellpadding="5" cellspacing="2"> - <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr> + <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr> <% for (int i = 0; i < reduceTaskReports.length; i++) { Vector v = reduceTaskReports[i]; - out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>"); - if (v.size() == 3) { - out.print("<td></td>"); - } else { - for (int j = 3; j < v.size(); j++) { - out.print("<td>" + v.elementAt(j) + "</td>"); + String tipid = (String) v.elementAt(0); + String progress = (String) v.elementAt(1); + int diagnosticSize = ((Integer) v.elementAt(2)).intValue(); + + out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>"); + for (int j = 0; j < diagnosticSize; j++) { + Vector taskData = (Vector) v.elementAt(3 + ((2 * j))); + String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1)); + out.print(taskStateString); + out.print("<b>"); + + for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) { + out.print("" + it2.next()); + out.println("<b>"); } } + out.print("</td>"); out.print("</tr>\n"); } %> </table> </center> - + <% + } +%> <hr> <a href="/jobtracker.jsp">Go back to JobTracker</a><br> Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp (original) +++ lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp Thu Feb 2 09:58:38 2006 @@ -46,22 +46,22 @@ out.print("<tr><td align=\"center\" colspan=\"8\"><b>" + label + " Jobs </b></td></tr>\n"); if (jobs.size() > 0) { - out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps attempted</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces attempted</b></td><td><b>reduces completed</b></td></tr>\n"); + out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces completed</b></td></tr>\n"); for (Iterator it = jobs.iterator(); it.hasNext(); ) { - JobTracker.JobInProgress job = (JobTracker.JobInProgress) it.next(); + JobInProgress job = (JobInProgress) it.next(); JobProfile profile = job.getProfile(); JobStatus status = job.getStatus(); String jobid = profile.getJobId(); - float completedRatio = (100 * job.completedRatio()); + double completedRatio = (0.5 * (100 * status.mapProgress())) + + (0.5 * (100 * status.reduceProgress())); + int desiredMaps = job.desiredMaps(); - int attemptedMaps = job.attemptedMaps(); - int completedMaps = job.completedMaps(); int desiredReduces = job.desiredReduces(); - int attemptedReduces = job.attemptedReduces(); - int completedReduces = job.completedReduces(); + int completedMaps = job.finishedMaps(); + int completedReduces = job.finishedReduces(); - out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + attemptedMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td>" + attemptedReduces + "</td><td> " + completedReduces + "</td></tr>\n"); + out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td> " + completedReduces + "</td></tr>\n"); } } else { out.print("<tr><td align=\"center\" colspan=\"8\"><i>none</i></td></tr>\n");
