Added: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java?rev=374443&view=auto ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java Thu Feb 2 09:58:38 2006 @@ -0,0 +1,445 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.mapred; + +import org.apache.nutch.io.*; +import org.apache.nutch.fs.*; +import org.apache.nutch.ipc.*; +import org.apache.nutch.util.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.logging.*; + + +//////////////////////////////////////////////////////// +// TaskInProgress maintains all the info needed for a +// Task in the lifetime of its owning Job. A given Task +// might be speculatively executed or reexecuted, so we +// need a level of indirection above the running-id itself. +// +// A given TaskInProgress contains multiple taskids, +// 0 or more of which might be executing at any one time. +// (That's what allows speculative execution.) A taskid +// is now *never* recycled. A TIP allocates enough taskids +// to account for all the speculation and failures it will +// ever have to handle. Once those are up, the TIP is dead. +// +//////////////////////////////////////////////////////// +class TaskInProgress { + static final int MAX_TASK_EXECS = 10; + static final int MAX_TASK_FAILURES = 4; + static final double SPECULATIVE_GAP = 0.2; + static final long SPECULATIVE_LAG = 60 * 1000; + + public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.mapred.TaskInProgress"); + + // Defines the TIP + String jobFile = null; + FileSplit split = null; + TaskInProgress predecessors[] = null; + int partition; + JobTracker jobtracker; + String id; + String totalTaskIds[]; + JobInProgress job; + + // Status of the TIP + int numTaskFailures = 0; + double progress = 0; + long startTime = 0; + int completes = 0; + boolean failed = false; + TreeSet usableTaskIds = new TreeSet(); + TreeSet recentTasks = new TreeSet(); + NutchConf nutchConf; + + TreeMap taskDiagnosticData = new TreeMap(); + TreeMap taskStatuses = new TreeMap(); + + TreeSet machinesWhereFailed = new TreeSet(); + TreeSet tasksReportedClosed = new TreeSet(); + + /** + * Constructor for MapTask + */ + public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) { + this.jobFile = jobFile; + this.split = split; + this.jobtracker = jobtracker; + this.job = job; + this.nutchConf = nutchConf; + init(); + } + + /** + * Constructor for ReduceTask + */ + public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) { + this.jobFile = jobFile; + this.predecessors = predecessors; + this.partition = partition; + this.jobtracker = jobtracker; + this.job = job; + this.nutchConf = nutchConf; + init(); + } + + /** + * Initialization common to Map and Reduce + */ + void init() { + this.startTime = System.currentTimeMillis(); + this.id = "tip_" + jobtracker.createUniqueId(); + this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES]; + for (int i = 0; i < totalTaskIds.length; i++) { + if (isMapTask()) { + totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId(); + } else { + totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId(); + } + usableTaskIds.add(totalTaskIds[i]); + } + } + + //////////////////////////////////// + // Accessors, info, profiles, etc. + //////////////////////////////////// + + /** + * Return the parent job + */ + public JobInProgress getJob() { + return job; + } + /** + * Return an ID for this task, not its component taskid-threads + */ + public String getTIPId() { + return this.id; + } + /** + * Whether this is a map task + */ + public boolean isMapTask() { + return split != null; + } + /** + */ + public boolean isComplete() { + return (completes > 0); + } + /** + */ + public boolean isComplete(String taskid) { + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + if (status == null) { + return false; + } + return ((completes > 0) && (status.getRunState() == TaskStatus.SUCCEEDED)); + } + /** + */ + public boolean isFailed() { + return failed; + } + /** + * Number of times the TaskInProgress has failed. + */ + public int numTaskFailures() { + return numTaskFailures(); + } + /** + * Get the overall progress (from 0 to 1.0) for this TIP + */ + public double getProgress() { + return progress; + } + /** + * Returns whether a component task-thread should be + * closed because the containing JobInProgress has completed. + */ + public boolean shouldCloseForClosedJob(String taskid) { + // If the thing has never been closed, + // and it belongs to this TIP, + // and this TIP is somehow FINISHED, + // then true + TaskStatus ts = (TaskStatus) taskStatuses.get(taskid); + if ((ts != null) && + (! tasksReportedClosed.contains(taskid)) && + (job.getStatus().getRunState() != JobStatus.RUNNING)) { + tasksReportedClosed.add(taskid); + return true; + } else { + return false; + } + } + + /** + * A TaskInProgress might be speculatively executed, and so + * can have many taskids simultaneously. Reduce tasks rely on knowing + * their predecessor ids, so they can be sure that all the previous + * work has been completed. + * + * But we don't know ahead of time which task id will actually be + * the one that completes for a given Map task. We don't want the + * Reduce task to have to be recreated after Map-completion, or check + * in with the JobTracker. So instead, each TaskInProgress preallocates + * all the task-ids it could ever want to run simultaneously. Then the + * Reduce task can be told about all the ids task-ids for a given Map + * TaskInProgress. If any of the Map TIP's tasks complete, the Reduce + * task will know all is well, and can continue. + * + * Most of the time, only a small number of the possible task-ids will + * ever be used. + */ + public String[] getAllPossibleTaskIds() { + return totalTaskIds; + } + + /** + * Creates a "status report" for this task. Includes the + * task ID and overall status, plus reports for all the + * component task-threads that have ever been started. + */ + Vector generateSingleReport() { + Vector report = new Vector(); + report.add(getTIPId()); + report.add("" + progress); + + report.add(new Integer(taskDiagnosticData.size())); + for (Iterator it = taskDiagnosticData.keySet().iterator(); it.hasNext(); ) { + String taskid = (String) it.next(); + Vector taskData = (Vector) taskDiagnosticData.get(taskid); + + TaskStatus taskStatus = (TaskStatus) taskStatuses.get(taskid); + String taskStateString = taskStatus.getStateString(); + + report.add(taskData); + report.add(taskStateString); + } + return report; + } + + //////////////////////////////////////////////// + // Update methods, usually invoked by the owning + // job. + //////////////////////////////////////////////// + /** + * A status message from a client has arrived. + * It updates the status of a single component-thread-task, + * which might result in an overall TaskInProgress status update. + */ + public void updateStatus(TaskStatus status) { + String taskid = status.getTaskId(); + String diagInfo = status.getDiagnosticInfo(); + if (diagInfo != null && diagInfo.length() > 0) { + Vector diagHistory = (Vector) taskDiagnosticData.get(taskid); + if (diagHistory == null) { + diagHistory = new Vector(); + taskDiagnosticData.put(taskid, diagHistory); + } + diagHistory.add(diagInfo); + } + taskStatuses.put(taskid, status); + + // Recompute progress + recomputeProgress(); + } + + /** + * Indicate that one of the taskids in this TaskInProgress + * has failed. + */ + public void failedSubTask(String taskid, String trackerName) { + // + // Note the failure and its location + // + LOG.info("Task '" + taskid + "' has been lost."); + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + if (status != null) { + status.setRunState(TaskStatus.FAILED); + } + this.recentTasks.remove(taskid); + this.completes--; + + numTaskFailures++; + if (numTaskFailures >= MAX_TASK_FAILURES) { + LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times."); + kill(); + } + machinesWhereFailed.add(trackerName); + + // Ask JobTracker to forget about this task + jobtracker.removeTaskEntry(taskid); + + recomputeProgress(); + } + + /** + * Indicate that one of the taskids in this TaskInProgress + * has successfully completed! + */ + public void completed(String taskid) { + LOG.info("Task '" + taskid + "' has completed."); + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + status.setRunState(TaskStatus.SUCCEEDED); + recentTasks.remove(taskid); + + // + // Now that the TIP is complete, the other speculative + // subtasks will be closed when the owning tasktracker + // reports in and calls shouldClose() on this object. + // + + this.completes++; + recomputeProgress(); + } + + /** + * The TIP's been ordered kill()ed. + */ + public void kill() { + if (isComplete() || failed) { + return; + } + this.failed = true; + recomputeProgress(); + } + + /** + * This method is called whenever there's a status change + * for one of the TIP's sub-tasks. It recomputes the overall + * progress for the TIP. We examine all sub-tasks and find + * the one that's most advanced (and non-failed). + */ + void recomputeProgress() { + if (isComplete()) { + this.progress = 1; + } else if (failed) { + this.progress = 0; + } else { + double bestProgress = 0; + for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) { + String taskid = (String) it.next(); + TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + if (status.getRunState() == TaskStatus.SUCCEEDED) { + bestProgress = 1; + break; + } else if (status.getRunState() == TaskStatus.RUNNING) { + bestProgress = Math.max(bestProgress, status.getProgress()); + } + } + this.progress = bestProgress; + } + } + + ///////////////////////////////////////////////// + // "Action" methods that actually require the TIP + // to do something. + ///////////////////////////////////////////////// + /** + * Return whether this TIP has an NDFS cache-driven task + * to run at the given taskTracker. + */ + boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) { + if (failed || isComplete() || recentTasks.size() > 0) { + return false; + } else { + try { + if (isMapTask()) { + NutchFileSystem fs = NutchFileSystem.get(nutchConf); + String hints[][] = fs.getFileCacheHints(split.getFile(), split.getStart(), split.getLength()); + for (int i = 0; i < hints.length; i++) { + for (int j = 0; j < hints[i].length; j++) { + if (hints[i][j].equals(tts.getHost())) { + return true; + } + } + } + } + } catch (IOException ie) { + } + return false; + } + } + /** + * Return whether this TIP has a non-speculative task to run + */ + boolean hasTask() { + if (failed || isComplete() || recentTasks.size() > 0) { + return false; + } else { + for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) { + TaskStatus ts = (TaskStatus) it.next(); + if (ts.getRunState() == TaskStatus.RUNNING) { + return false; + } + } + return true; + } + } + /** + * Return whether the TIP has a speculative task to run. We + * only launch a speculative task if the current TIP is really + * far behind, and has been behind for a non-trivial amount of + * time. + */ + boolean hasSpeculativeTask(double averageProgress) { + // + // REMIND - mjc - these constants should be examined + // in more depth eventually... + // + if (isMapTask() && + (averageProgress - progress >= SPECULATIVE_GAP) && + (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { + return true; + } + return false; + } + + /** + * Return a Task that can be sent to a TaskTracker for execution. + */ + public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) { + Task t = null; + if (hasTaskWithCacheHit(taskTracker, tts) || + hasTask() || + hasSpeculativeTask(avgProgress)) { + + String taskid = (String) usableTaskIds.first(); + usableTaskIds.remove(taskid); + + if (isMapTask()) { + t = new MapTask(jobFile, taskid, split); + } else { + String mapIdPredecessors[][] = new String[predecessors.length][]; + for (int i = 0; i < mapIdPredecessors.length; i++) { + mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds(); + } + t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition); + } + t.setConf(nutchConf); + + recentTasks.add(taskid); + + // Ask JobTracker to note that the task exists + jobtracker.createTaskEntry(taskid, taskTracker, this); + } + return t; + } +}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java Thu Feb 2 09:58:38 2006 @@ -33,6 +33,7 @@ public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.mapred.TaskRunner"); + boolean killed = false; private Process process; private Task t; private TaskTracker tracker; @@ -51,7 +52,7 @@ /** Called to assemble this task's input. This method is run in the parent * process before the child is spawned. It should not execute user code, * only system code. */ - public void prepare() throws IOException {} + public boolean prepare() throws IOException {return true;} /** Called when this task's output is no longer needed. * This method is run in the parent process after the child exits. It should @@ -62,7 +63,9 @@ public final void run() { try { - prepare(); + if (! prepare()) { + return; + } String sep = System.getProperty("path.separator"); File workDir = new File(new File(t.getJobFile()).getParent(), "work"); @@ -72,7 +75,7 @@ // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); - + JobConf job = new JobConf(t.getJobFile()); String jar = job.getJar(); if (jar != null) { // if jar exists, it into workDir @@ -158,6 +161,7 @@ if (process != null) { process.destroy(); } + killed = true; } /** Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java Thu Feb 2 09:58:38 2006 @@ -34,6 +34,7 @@ public static final int UNASSIGNED = 3; private String taskid; + private boolean isMap; private float progress; private int runState; private String diagnosticInfo; @@ -41,8 +42,9 @@ public TaskStatus() {} - public TaskStatus(String taskid, float progress, int runState, String diagnosticInfo, String stateString) { + public TaskStatus(String taskid, boolean isMap, float progress, int runState, String diagnosticInfo, String stateString) { this.taskid = taskid; + this.isMap = isMap; this.progress = progress; this.runState = runState; this.diagnosticInfo = diagnosticInfo; @@ -50,6 +52,7 @@ } public String getTaskId() { return taskid; } + public boolean getIsMap() { return isMap; } public float getProgress() { return progress; } public void setProgress(float progress) { this.progress = progress; } public int getRunState() { return runState; } @@ -64,6 +67,7 @@ ////////////////////////////////////////////// public void write(DataOutput out) throws IOException { UTF8.writeString(out, taskid); + out.writeBoolean(isMap); out.writeFloat(progress); out.writeInt(runState); UTF8.writeString(out, diagnosticInfo); @@ -72,6 +76,7 @@ public void readFields(DataInput in) throws IOException { this.taskid = UTF8.readString(in); + this.isMap = in.readBoolean(); this.progress = in.readFloat(); this.runState = in.readInt(); this.diagnosticInfo = UTF8.readString(in); Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java Thu Feb 2 09:58:38 2006 @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nutch.mapred; + package org.apache.nutch.mapred; import org.apache.nutch.fs.*; import org.apache.nutch.io.*; @@ -33,7 +33,6 @@ * @author Mike Cafarella *******************************************************/ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable { - private int maxCurrentTask; static final long WAIT_FOR_DONE = 3 * 1000; private long taskTimeout; @@ -66,6 +65,8 @@ private NutchConf fConf; private MapOutputFile mapOutputFile; + private int maxCurrentTasks; + /** * Start with the local machine name, and the default JobTracker */ @@ -77,9 +78,10 @@ * Start with the local machine name, and the addr of the target JobTracker */ public TaskTracker(InetSocketAddress jobTrackAddr, NutchConf conf) throws IOException { + maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2); + this.fConf = conf; this.jobTrackAddr = jobTrackAddr; - this.maxCurrentTask = conf.getInt("mapred.tasktracker.tasks.maximum", 2); this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); @@ -93,6 +95,7 @@ */ void initialize() throws IOException { this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000); + LOG.info("Starting tracker " + taskTrackerName); this.localHostname = InetAddress.getLocalHost().getHostName(); new JobConf(this.fConf).deleteLocalFiles(SUBDIR); @@ -108,7 +111,7 @@ // RPC initialization while (true) { try { - this.taskReportServer = RPC.getServer(this, this.taskReportPort, this.maxCurrentTask, false, this.fConf); + this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf); this.taskReportServer.start(); break; } catch (BindException e) { @@ -119,7 +122,7 @@ } while (true) { try { - this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, this.maxCurrentTask, false, this.fConf); + this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks, false, this.fConf); this.mapOutputServer.start(); break; } catch (BindException e) { @@ -142,9 +145,14 @@ * clean. */ public synchronized void close() throws IOException { - // Kill running tasks - while (tasks.size() > 0) { - TaskInProgress tip = (TaskInProgress)tasks.get(tasks.firstKey()); + // + // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose', + // because calling jobHasFinished() may result in an edit to 'tasks'. + // + TreeMap tasksToClose = new TreeMap(); + tasksToClose.putAll(tasks); + for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) { + TaskInProgress tip = (TaskInProgress) it.next(); tip.jobHasFinished(); } @@ -154,11 +162,21 @@ } catch (InterruptedException ie) { } - // Shutdown local RPC servers - if (taskReportServer != null) { - taskReportServer.stop(); - taskReportServer = null; - } + // + // Shutdown local RPC servers. Do them + // in parallel, as RPC servers can take a long + // time to shutdown. (They need to wait a full + // RPC timeout, which might be 10-30 seconds.) + // + new Thread() { + public void run() { + if (taskReportServer != null) { + taskReportServer.stop(); + taskReportServer = null; + } + } + }.start(); + if (mapOutputServer != null) { mapOutputServer.stop(); mapOutputServer = null; @@ -227,7 +245,7 @@ // // Check if we should create a new Task // - if (runningTasks.size() < this.maxCurrentTask) { + if (runningTasks.size() < maxCurrentTasks) { Task t = jobClient.pollForNewTask(taskTrackerName); if (t != null) { TaskInProgress tip = new TaskInProgress(t, this.fConf); @@ -255,9 +273,16 @@ } // + // Check for any Tasks that should be killed, even if + // the containing Job is still ongoing. (This happens + // with speculative execution, when one version of the + // task finished before another + // + + // // Check for any Tasks whose job may have ended // - String toCloseId = jobClient.pollForClosedTask(taskTrackerName); + String toCloseId = jobClient.pollForTaskWithClosedJob(taskTrackerName); if (toCloseId != null) { synchronized (this) { TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId); @@ -288,7 +313,8 @@ staleState = true; } } catch (Exception ex) { - LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. ex=" + ex + " Retrying..."); + ex.printStackTrace(); + LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying..."); try { Thread.sleep(5000); } catch (InterruptedException ie) { @@ -373,7 +399,7 @@ /** */ public TaskStatus createStatus() { - TaskStatus status = new TaskStatus(task.getTaskId(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString); + TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString); if (diagnosticInfo.length() > 0) { diagnosticInfo = new StringBuffer(); } Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java Thu Feb 2 09:58:38 2006 @@ -77,6 +77,28 @@ public Iterator taskReports() { return taskReports.iterator(); } + + /** + * Return the current MapTask count + */ + public int countMapTasks() { + int mapCount = 0; + for (Iterator it = taskReports.iterator(); it.hasNext(); ) { + TaskStatus ts = (TaskStatus) it.next(); + if (ts.getIsMap()) { + mapCount++; + } + } + return mapCount; + } + + /** + * Return the current ReduceTask count + */ + public int countReduceTasks() { + return taskReports.size() - countMapTasks(); + } + /** */ public long getLastSeen() { Added: lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java?rev=374443&view=auto ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java (added) +++ lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java Thu Feb 2 09:58:38 2006 @@ -0,0 +1,317 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.mapred; + +import org.apache.nutch.fs.*; +import org.apache.nutch.io.*; +import org.apache.nutch.util.*; +import org.apache.nutch.mapred.lib.*; + +import java.io.*; +import java.util.*; +import java.math.*; + +/********************************************************** + * MapredLoadTest generates a bunch of work that exercises + * a Nutch Map-Reduce system (and NDFS, too). It goes through + * the following steps: + * + * 1) Take inputs 'range' and 'counts'. + * 2) Generate 'counts' random integers between 0 and range-1. + * 3) Create a file that lists each integer between 0 and range-1, + * and lists the number of times that integer was generated. + * 4) Emit a (very large) file that contains all the integers + * in the order generated. + * 5) After the file has been generated, read it back and count + * how many times each int was generated. + * 6) Compare this big count-map against the original one. If + * they match, then SUCCESS! Otherwise, FAILURE! + * + * OK, that's how we can think about it. What are the map-reduce + * steps that get the job done? + * + * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. + * 2) In a non-mapread thread, generate the answer-key and write to disk. + * 3) In a mapred job, divide the answer key into K jobs. + * 4) A mapred 'generator' task consists of K map jobs. Each reads + * an individual "sub-key", and generates integers according to + * to it (though with a random ordering). + * 5) The generator's reduce task agglomerates all of those files + * into a single one. + * 6) A mapred 'reader' task consists of M map jobs. The output + * file is cut into M pieces. Each of the M jobs counts the + * individual ints in its chunk and creates a map of all seen ints. + * 7) A mapred job integrates all the count files into a single one. + * + **********************************************************/ +public class MapredLoadTest { + static class RandomGenMapper implements Mapper { + Random r = new Random(); + public void configure(JobConf job) { + } + + public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { + int randomVal = ((IntWritable) key).get(); + int randomCount = ((IntWritable) val).get(); + + for (int i = 0; i < randomCount; i++) { + out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal)); + } + } + } + static class RandomGenReducer implements Reducer { + public void configure(JobConf job) { + } + + public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { + int keyint = ((IntWritable) key).get(); + while (it.hasNext()) { + int val = ((IntWritable) it.next()).get(); + out.collect(new UTF8("" + val), new UTF8("")); + } + } + } + static class RandomCheckMapper implements Mapper { + public void configure(JobConf job) { + } + + public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { + long pos = ((LongWritable) key).get(); + UTF8 str = (UTF8) val; + + out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1)); + } + } + static class RandomCheckReducer implements Reducer { + public void configure(JobConf job) { + } + + public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { + int keyint = ((IntWritable) key).get(); + int count = 0; + while (it.hasNext()) { + it.next(); + count++; + } + out.collect(new IntWritable(keyint), new IntWritable(count)); + } + } + + int range; + int counts; + Random r = new Random(); + NutchConf nutchConf; + + /** + * MapredLoadTest + */ + public MapredLoadTest(int range, int counts, NutchConf nutchConf) throws IOException { + this.range = range; + this.counts = counts; + this.nutchConf = nutchConf; + } + + /** + * + */ + public void launch() throws IOException { + // + // Generate distribution of ints. This is the answer key. + // + int countsToGo = counts; + int dist[] = new int[range]; + for (int i = 0; i < range; i++) { + double avgInts = (1.0 * countsToGo) / (range - i); + dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian()))); + countsToGo -= dist[i]; + } + if (countsToGo > 0) { + dist[dist.length-1] += countsToGo; + } + + // + // Write the answer key to a file. + // + NutchFileSystem fs = NutchFileSystem.get(nutchConf); + File testdir = new File("mapred.loadtest"); + fs.mkdirs(testdir); + + File randomIns = new File(testdir, "genins"); + fs.mkdirs(randomIns); + + File answerkey = new File(randomIns, "answer.key"); + SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), IntWritable.class, IntWritable.class); + try { + for (int i = 0; i < range; i++) { + out.append(new IntWritable(i), new IntWritable(dist[i])); + } + } finally { + out.close(); + } + + // + // Now we need to generate the random numbers according to + // the above distribution. + // + // We create a lot of map tasks, each of which takes at least + // one "line" of the distribution. (That is, a certain number + // X is to be generated Y number of times.) + // + // A map task emits Y key/val pairs. The val is X. The key + // is a randomly-generated number. + // + // The reduce task gets its input sorted by key. That is, sorted + // in random order. It then emits a single line of text that + // for the given values. It does not emit the key. + // + // Because there's just one reduce task, we emit a single big + // file of random numbers. + // + File randomOuts = new File(testdir, "genouts"); + fs.mkdirs(randomOuts); + + + JobConf genJob = new JobConf(nutchConf); + genJob.setInputDir(randomIns); + genJob.setInputKeyClass(IntWritable.class); + genJob.setInputValueClass(IntWritable.class); + genJob.setInputFormat(SequenceFileInputFormat.class); + genJob.setMapperClass(RandomGenMapper.class); + + genJob.setOutputDir(randomOuts); + genJob.setOutputKeyClass(IntWritable.class); + genJob.setOutputValueClass(IntWritable.class); + genJob.setOutputFormat(TextOutputFormat.class); + genJob.setReducerClass(RandomGenReducer.class); + genJob.setNumReduceTasks(1); + + JobClient.runJob(genJob); + + // + // Next, we read the big file in and regenerate the + // original map. + // + // We have many map tasks, each of which read at least one + // of the output numbers. For each number read in, the + // map task emits a key/value pair where the key is the + // number and the value is "1". + // + // We have a single reduce task, which receives its input + // sorted by the key emitted above. For each key, there will + // be a certain number of "1" values. The reduce task sums + // these values to compute how many times the given key was + // emitted. + // + // The reduce task then emits a key/val pair where the key + // is the number in question, and the value is the number of + // times the key was emitted. This is the same format as the + // original answer key (except that numbers emitted zero times + // will not appear in the regenerated key.) + // + File finalOuts = new File(testdir, "finalouts"); + fs.mkdirs(finalOuts); + JobConf checkJob = new JobConf(nutchConf); + checkJob.setInputDir(randomOuts); + checkJob.setInputKeyClass(LongWritable.class); + checkJob.setInputValueClass(UTF8.class); + checkJob.setInputFormat(TextInputFormat.class); + checkJob.setMapperClass(RandomCheckMapper.class); + + checkJob.setOutputDir(finalOuts); + checkJob.setOutputKeyClass(IntWritable.class); + checkJob.setOutputValueClass(IntWritable.class); + checkJob.setOutputFormat(SequenceFileOutputFormat.class); + checkJob.setReducerClass(RandomCheckReducer.class); + checkJob.setNumReduceTasks(1); + + JobClient.runJob(checkJob); + + // + // Finally, we compare the reconstructed answer key with the + // original one. Remember, we need to ignore zero-count items + // in the original key. + // + boolean success = true; + File recomputedkey = new File(finalOuts, "part-00000"); + SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey.getPath(), nutchConf); + int totalseen = 0; + try { + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + for (int i = 0; i < range; i++) { + if (dist[i] == 0) { + continue; + } + if (! in.next(key, val)) { + System.err.println("Cannot read entry " + i); + success = false; + break; + } else { + if ( !((key.get() == i ) && (val.get() == dist[i]))) { + System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]); + success = false; + } + totalseen += val.get(); + } + } + if (success) { + if (in.next(key, val)) { + System.err.println("Unnecessary lines in recomputed key!"); + success = false; + } + } + } finally { + in.close(); + } + int originalTotal = 0; + for (int i = 0; i < dist.length; i++) { + originalTotal += dist[i]; + } + System.out.println("Original sum: " + originalTotal); + System.out.println("Recomputed sum: " + totalseen); + + // + // Write to "results" whether the test succeeded or not. + // + File resultFile = new File(testdir, "results"); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile))); + try { + bw.write("Success=" + success + "\n"); + System.out.println("Success=" + success); + } finally { + bw.close(); + } + } + + /** + * Launches all the tasks in order. + */ + public static void main(String[] argv) throws Exception { + if (argv.length < 2) { + System.err.println("Usage: MapredLoadTest <range> <counts>"); + System.err.println(); + System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>"); + return; + } + + int i = 0; + int range = Integer.parseInt(argv[i++]); + int counts = Integer.parseInt(argv[i++]); + + MapredLoadTest mlt = new MapredLoadTest(range, counts, new NutchConf()); + mlt.launch(); + } +} Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp (original) +++ lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp Thu Feb 2 09:58:38 2006 @@ -9,21 +9,28 @@ <% String jobid = request.getParameter("jobid"); JobTracker tracker = JobTracker.getTracker(); - JobTracker.JobInProgress job = (JobTracker.JobInProgress) tracker.getJob(jobid); + JobInProgress job = (JobInProgress) tracker.getJob(jobid); JobProfile profile = (job != null) ? (job.getProfile()) : null; JobStatus status = (job != null) ? (job.getStatus()) : null; - Vector mapTaskReports[] = tracker.getMapTaskReport(jobid); - Vector reduceTaskReports[] = tracker.getReduceTaskReport(jobid); + Vector mapTaskReports[] = (job != null) ? tracker.getMapTaskReport(jobid) : null; + Vector reduceTaskReports[] = (job != null) ? tracker.getReduceTaskReport(jobid) : null; %> <html> <title>Nutch MapReduce Job Details</title> <body> +<% + if (job == null) { + %> + No job found<br> + <% + } else { + %> <h1>Job '<%=jobid%>'</h1> <b>Job File:</b> <%=profile.getJobFile()%><br> -<b>Start time:</b> <%= new Date(job.getStartTime())%><br> +<b>The job started at:</b> <%= new Date(job.getStartTime())%><br> <% if (status.getRunState() == JobStatus.RUNNING) { out.print("The job is still running.<br>\n"); @@ -38,19 +45,28 @@ <h2>Map Tasks</h2> <center> <table border=2 cellpadding="5" cellspacing="2"> - <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr> + <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr> <% for (int i = 0; i < mapTaskReports.length; i++) { Vector v = mapTaskReports[i]; - out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>"); - if (v.size() == 3) { - out.print("<td></td>"); - } else { - for (int j = 3; j < v.size(); j++) { - out.print("<td>" + v.elementAt(j) + "</td>"); + String tipid = (String) v.elementAt(0); + String progress = (String) v.elementAt(1); + int diagnosticSize = ((Integer) v.elementAt(2)).intValue(); + + out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>"); + for (int j = 0; j < diagnosticSize; j++) { + Vector taskData = (Vector) v.elementAt(3 + ((2 * j))); + String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1)); + out.print(taskStateString); + out.print("<b>"); + + for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) { + out.print("" + it2.next()); + out.println("<b>"); } } + out.print("</td>"); out.print("</tr>\n"); } %> @@ -62,25 +78,36 @@ <h2>Reduce Tasks</h2> <center> <table border=2 cellpadding="5" cellspacing="2"> - <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr> + <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr> <% for (int i = 0; i < reduceTaskReports.length; i++) { Vector v = reduceTaskReports[i]; - out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>"); - if (v.size() == 3) { - out.print("<td></td>"); - } else { - for (int j = 3; j < v.size(); j++) { - out.print("<td>" + v.elementAt(j) + "</td>"); + String tipid = (String) v.elementAt(0); + String progress = (String) v.elementAt(1); + int diagnosticSize = ((Integer) v.elementAt(2)).intValue(); + + out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>"); + for (int j = 0; j < diagnosticSize; j++) { + Vector taskData = (Vector) v.elementAt(3 + ((2 * j))); + String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1)); + out.print(taskStateString); + out.print("<b>"); + + for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) { + out.print("" + it2.next()); + out.println("<b>"); } } + out.print("</td>"); out.print("</tr>\n"); } %> </table> </center> - + <% + } +%> <hr> <a href="/jobtracker.jsp">Go back to JobTracker</a><br> Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp?rev=374443&r1=374442&r2=374443&view=diff ============================================================================== --- lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp (original) +++ lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp Thu Feb 2 09:58:38 2006 @@ -46,22 +46,22 @@ out.print("<tr><td align=\"center\" colspan=\"8\"><b>" + label + " Jobs </b></td></tr>\n"); if (jobs.size() > 0) { - out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps attempted</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces attempted</b></td><td><b>reduces completed</b></td></tr>\n"); + out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces completed</b></td></tr>\n"); for (Iterator it = jobs.iterator(); it.hasNext(); ) { - JobTracker.JobInProgress job = (JobTracker.JobInProgress) it.next(); + JobInProgress job = (JobInProgress) it.next(); JobProfile profile = job.getProfile(); JobStatus status = job.getStatus(); String jobid = profile.getJobId(); - float completedRatio = (100 * job.completedRatio()); + double completedRatio = (0.5 * (100 * status.mapProgress())) + + (0.5 * (100 * status.reduceProgress())); + int desiredMaps = job.desiredMaps(); - int attemptedMaps = job.attemptedMaps(); - int completedMaps = job.completedMaps(); int desiredReduces = job.desiredReduces(); - int attemptedReduces = job.attemptedReduces(); - int completedReduces = job.completedReduces(); + int completedMaps = job.finishedMaps(); + int completedReduces = job.finishedReduces(); - out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + attemptedMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td>" + attemptedReduces + "</td><td> " + completedReduces + "</td></tr>\n"); + out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td> " + completedReduces + "</td></tr>\n"); } } else { out.print("<tr><td align=\"center\" colspan=\"8\"><i>none</i></td></tr>\n"); ------------------------------------------------------- This SF.net email is sponsored by: Splunk Inc. Do you grep through log files for problems? Stop! Download the new AJAX search engine that makes searching your log files as easy as surfing the web. DOWNLOAD SPLUNK! http://sel.as-us.falkag.net/sel?cmd=lnk&kid=103432&bid=230486&dat=121642 _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs