Author: cutting
Date: Tue Oct 4 15:01:28 2005
New Revision: 294929
URL: http://svn.apache.org/viewcvs?rev=294929&view=rev
Log:
Permit local files to be stored on multiple devices.
Modified:
lucene/nutch/branches/mapred/conf/nutch-default.xml
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/conf/nutch-default.xml (original)
+++ lucene/nutch/branches/mapred/conf/nutch-default.xml Tue Oct 4 15:01:28 2005
@@ -390,7 +390,8 @@
<name>mapred.local.dir</name>
<value>/tmp/nutch/mapred/local</value>
<description>The local directory where MapReduce stores intermediate
- data files.
+ data files. May be a space- or comma- separated list of
+ directories on different devices in order to spread disk i/o.
</description>
</property>
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
Tue Oct 4 15:01:28 2005
@@ -72,8 +72,8 @@
public RecordWriter getRecordWriter(final NutchFileSystem fs, JobConf job,
String name) throws IOException {
final File perm = new File(job.getOutputDir(), name);
- final File temp = new File(job.getLocalDir(), "index-"
- +Integer.toString(new Random().nextInt()));
+ final File temp =
+ job.getLocalFile("index","_"+Integer.toString(new Random().nextInt()));
fs.delete(perm); // delete old, if any
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
Tue Oct 4 15:01:28 2005
@@ -31,6 +31,7 @@
import java.util.Collections;
import org.apache.nutch.fs.NutchFileSystem;
+import org.apache.nutch.fs.FileUtil;
import org.apache.nutch.util.NutchConf;
import org.apache.nutch.io.Writable;
@@ -85,9 +86,34 @@
"/tmp/nutch/mapred/system"));
}
- public static File getLocalDir() {
- return new File(NutchConf.get().get("mapred.local.dir",
- "/tmp/nutch/mapred/local"));
+ public static String[] getLocalDirs() throws IOException {
+ return NutchConf.get().getStrings("mapred.local.dir");
+ }
+
+ public static void deleteLocalFiles() throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileUtil.fullyDelete(new File(localDirs[i]));
+ }
+ }
+
+ public static void deleteLocalFiles(String subdir) throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileUtil.fullyDelete(new File(localDirs[i], subdir));
+ }
+ }
+
+ /** Constructs a local file name. Files are distributed among configured
+ * local directories.*/
+ public static File getLocalFile(String subdir, String name)
+ throws IOException {
+ String[] localDirs = getLocalDirs();
+ String path = subdir + File.separator + name;
+ int i = (path.hashCode() & Integer.MAX_VALUE) % localDirs.length;
+ File file = new File(localDirs[i], path);
+ file.getParentFile().mkdirs();
+ return file;
}
public void setInputDir(File dir) { set("mapred.input.dir", dir); }
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
Tue Oct 4 15:01:28 2005
@@ -195,7 +195,7 @@
// Some jobs are stored in a local system directory. We can delete
// the files when we're done with the job.
- File localDir;
+ static final String SUBDIR = "jobTracker";
NutchFileSystem fs;
File systemDir;
@@ -211,9 +211,7 @@
fs.mkdirs(systemDir);
// Same with 'localDir' except it's always on the local disk.
- this.localDir = JobConf.getLocalDir();
- FileUtil.fullyDelete(localDir);
- this.localDir.mkdirs();
+ JobConf.deleteLocalFiles(SUBDIR);
// Set ports, start RPC servers, etc.
InetSocketAddress addr = getAddress(conf);
@@ -580,7 +578,7 @@
this.profile = new JobProfile(jobid, jobFile, url);
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.RUNNING);
- this.localJobFile = new File(localDir, jobid+".xml");
+ this.localJobFile = JobConf.getLocalFile(SUBDIR, jobid+".xml");
fs.copyToLocalFile(new File(jobFile), localJobFile);
JobConf jd = new JobConf(localJobFile);
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
Tue Oct 4 15:01:28 2005
@@ -45,7 +45,7 @@
this.file = file;
this.id = "job_" + newId();
- File localFile = new File(JobConf.getLocalDir(), id+".xml");
+ File localFile = JobConf.getLocalFile("localRunner", id+".xml");
fs.copyToLocalFile(new File(file), localFile);
this.job = new JobConf(localFile);
@@ -63,6 +63,7 @@
FileSplit[] splits = job.getInputFormat().getSplits(fs, job, 1);
// run a map task for each split
+ job.setNumReduceTasks(1); // force a single reduce task
for (int i = 0; i < splits.length; i++) {
mapIds.add("map_" + newId());
MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
@@ -70,7 +71,7 @@
}
// move map output to reduce input
- String reduceId = "_" + newId();
+ String reduceId = "reduce_" + newId();
for (int i = 0; i < mapIds.size(); i++) {
String mapId = (String)mapIds.get(i);
File mapOut = MapOutputFile.getOutputFile(mapId, 0);
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
Tue Oct 4 15:01:28 2005
@@ -25,8 +25,6 @@
/** A local file to be transferred via the [EMAIL PROTECTED]
MapOutputProtocol}. */
public class MapOutputFile implements Writable {
- private static final String LOCAL_DIR = JobConf.getLocalDir().toString();
-
private String mapTaskId;
private String reduceTaskId;
private int partition;
@@ -46,23 +44,23 @@
* @param mapTaskId a map task id
* @param partition a reduce partition
*/
- public static File getOutputFile(String mapTaskId, int partition) {
- File taskDir = new File(LOCAL_DIR, mapTaskId);
- return new File(taskDir, "part-"+partition+".out");
+ public static File getOutputFile(String mapTaskId, int partition)
+ throws IOException {
+ return JobConf.getLocalFile(mapTaskId, "part-"+partition+".out");
}
/** Create a local reduce input file name.
* @param mapTaskId a map task id
* @param reduceTaskId a reduce task id
*/
- public static File getInputFile(String mapTaskId, String reduceTaskId) {
- File taskDir = new File(LOCAL_DIR, reduceTaskId);
- return new File(taskDir, mapTaskId+".out");
+ public static File getInputFile(String mapTaskId, String reduceTaskId)
+ throws IOException {
+ return JobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
}
/** Removes all of the files related to a task. */
public static void removeAll(String taskId) throws IOException {
- FileUtil.fullyDelete(new File(LOCAL_DIR, taskId));
+ JobConf.deleteLocalFiles(taskId);
}
/**
@@ -70,7 +68,7 @@
* startup, to remove any leftovers from previous run.
*/
public static void cleanupStorage() throws IOException {
- FileUtil.fullyDelete(new File(LOCAL_DIR));
+ JobConf.deleteLocalFiles();
}
/** Construct a file for transfer. */
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
Tue Oct 4 15:01:28 2005
@@ -27,8 +27,6 @@
/** A Reduce task. */
public class ReduceTask extends Task {
- private static final String LOCAL_DIR = JobConf.getLocalDir().toString();
-
private String[] mapTaskIds;
private int partition;
private boolean sortComplete;
@@ -164,8 +162,7 @@
copyPhase.complete(); // copy is already complete
// open a file to collect map output
- File taskDir = new File(LOCAL_DIR, getTaskId());
- String file = new File(taskDir, "all.in").toString();
+ String file = job.getLocalFile(getTaskId(), "all.1").toString();
SequenceFile.Writer writer =
new SequenceFile.Writer(lfs, file, keyClass, valueClass);
try {
@@ -221,7 +218,7 @@
};
sortProgress.setName("Sort progress reporter for task "+getTaskId());
- String sortedFile = file+".sorted";
+ String sortedFile = job.getLocalFile(getTaskId(), "all.2").toString();
WritableComparator comparator = job.getOutputKeyComparator();
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Tue Oct 4 15:01:28 2005
@@ -63,7 +63,7 @@
static Random r = new Random();
NutchFileSystem fs = null;
- File localDir = null;
+ static final String SUBDIR = "taskTracker";
/**
* Start with the local machine name, and the default JobTracker
@@ -77,7 +77,6 @@
*/
public TaskTracker(InetSocketAddress jobTrackAddr) throws IOException {
this.jobTrackAddr = jobTrackAddr;
- this.localDir = new File(JobConf.getLocalDir(), "tracker");
initialize();
}
@@ -90,10 +89,7 @@
this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
this.localHostname = InetAddress.getLocalHost().getHostName();
- if (localDir.exists()) {
- FileUtil.fullyDelete(localDir);
- }
- localDir.mkdirs();
+ JobConf.deleteLocalFiles(SUBDIR);
// Clear out state tables
this.tasks = new TreeMap();
@@ -296,7 +292,6 @@
///////////////////////////////////////////////////////
class TaskInProgress {
Task task;
- File localTaskDir;
float progress;
int runstate;
String stateString = "";
@@ -311,11 +306,7 @@
public TaskInProgress(Task task) throws IOException {
this.task = task;
this.lastProgressReport = System.currentTimeMillis();
- this.localTaskDir = new File(localDir, task.getTaskId());
- if (localTaskDir.exists()) {
- FileUtil.fullyDelete(localTaskDir);
- }
- this.localTaskDir.mkdirs();
+ JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
localizeTask(task);
}
@@ -324,8 +315,10 @@
* So here, edit the Task's fields appropriately.
*/
void localizeTask(Task t) throws IOException {
- File localJobFile = new File(localTaskDir, "job.xml");
- File localJarFile = new File(localTaskDir, "job.jar");
+ File localJobFile =
+ JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(),
"job.xml");
+ File localJarFile =
+ JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(),
"job.jar");
String jobFile = t.getJobFile();
fs.copyToLocalFile(new File(jobFile), localJobFile);
@@ -487,7 +480,7 @@
runner.close();
} catch (IOException ie) {
}
- FileUtil.fullyDelete(localTaskDir);
+ JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
}
}