Author: cutting
Date: Tue Oct  4 15:01:28 2005
New Revision: 294929

URL: http://svn.apache.org/viewcvs?rev=294929&view=rev
Log:
Permit local files to be stored on multiple devices.

Modified:
    lucene/nutch/branches/mapred/conf/nutch-default.xml
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java

Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/conf/nutch-default.xml (original)
+++ lucene/nutch/branches/mapred/conf/nutch-default.xml Tue Oct  4 15:01:28 2005
@@ -390,7 +390,8 @@
   <name>mapred.local.dir</name>
   <value>/tmp/nutch/mapred/local</value>
   <description>The local directory where MapReduce stores intermediate
-  data files.
+  data files.  May be a space- or comma- separated list of
+  directories on different devices in order to spread disk i/o.
   </description>
 </property>
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java 
Tue Oct  4 15:01:28 2005
@@ -72,8 +72,8 @@
     public RecordWriter getRecordWriter(final NutchFileSystem fs, JobConf job,
                                         String name) throws IOException {
       final File perm = new File(job.getOutputDir(), name);
-      final File temp = new File(job.getLocalDir(), "index-"
-                                 +Integer.toString(new Random().nextInt()));
+      final File temp =
+        job.getLocalFile("index","_"+Integer.toString(new Random().nextInt()));
 
       fs.delete(perm);                            // delete old, if any
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java 
Tue Oct  4 15:01:28 2005
@@ -31,6 +31,7 @@
 import java.util.Collections;
 
 import org.apache.nutch.fs.NutchFileSystem;
+import org.apache.nutch.fs.FileUtil;
 import org.apache.nutch.util.NutchConf;
 
 import org.apache.nutch.io.Writable;
@@ -85,9 +86,34 @@
                                         "/tmp/nutch/mapred/system"));
   }
 
-  public static File getLocalDir() {
-    return new File(NutchConf.get().get("mapred.local.dir",
-                                        "/tmp/nutch/mapred/local"));
+  public static String[] getLocalDirs() throws IOException {
+    return NutchConf.get().getStrings("mapred.local.dir");
+  }
+
+  public static void deleteLocalFiles() throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileUtil.fullyDelete(new File(localDirs[i]));
+    }
+  }
+
+  public static void deleteLocalFiles(String subdir) throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileUtil.fullyDelete(new File(localDirs[i], subdir));
+    }
+  }
+
+  /** Constructs a local file name.  Files are distributed among configured
+   * local directories.*/
+  public static File getLocalFile(String subdir, String name)
+    throws IOException {
+    String[] localDirs = getLocalDirs();
+    String path = subdir + File.separator + name;
+    int i = (path.hashCode() & Integer.MAX_VALUE) % localDirs.length;
+    File file = new File(localDirs[i], path);
+    file.getParentFile().mkdirs();
+    return file;
   }
 
   public void setInputDir(File dir) { set("mapred.input.dir", dir); }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java 
Tue Oct  4 15:01:28 2005
@@ -195,7 +195,7 @@
 
     // Some jobs are stored in a local system directory.  We can delete
     // the files when we're done with the job.
-    File localDir;
+    static final String SUBDIR = "jobTracker";
     NutchFileSystem fs;
     File systemDir;
 
@@ -211,9 +211,7 @@
         fs.mkdirs(systemDir);
 
         // Same with 'localDir' except it's always on the local disk.
-        this.localDir = JobConf.getLocalDir();
-        FileUtil.fullyDelete(localDir);
-        this.localDir.mkdirs();
+        JobConf.deleteLocalFiles(SUBDIR);
 
         // Set ports, start RPC servers, etc.
         InetSocketAddress addr = getAddress(conf);
@@ -580,7 +578,7 @@
             this.profile = new JobProfile(jobid, jobFile, url);
             this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.RUNNING);
 
-            this.localJobFile = new File(localDir, jobid+".xml");
+            this.localJobFile = JobConf.getLocalFile(SUBDIR, jobid+".xml");
             fs.copyToLocalFile(new File(jobFile), localJobFile);
 
             JobConf jd = new JobConf(localJobFile);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
 Tue Oct  4 15:01:28 2005
@@ -45,7 +45,7 @@
       this.file = file;
       this.id = "job_" + newId();
 
-      File localFile = new File(JobConf.getLocalDir(), id+".xml");
+      File localFile = JobConf.getLocalFile("localRunner", id+".xml");
       fs.copyToLocalFile(new File(file), localFile);
       this.job = new JobConf(localFile);
 
@@ -63,6 +63,7 @@
         FileSplit[] splits = job.getInputFormat().getSplits(fs, job, 1);
 
         // run a map task for each split
+        job.setNumReduceTasks(1);                 // force a single reduce task
         for (int i = 0; i < splits.length; i++) {
           mapIds.add("map_" + newId());
           MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
@@ -70,7 +71,7 @@
         }
 
         // move map output to reduce input
-        String reduceId = "_" + newId();
+        String reduceId = "reduce_" + newId();
         for (int i = 0; i < mapIds.size(); i++) {
           String mapId = (String)mapIds.get(i);
           File mapOut = MapOutputFile.getOutputFile(mapId, 0);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
 Tue Oct  4 15:01:28 2005
@@ -25,8 +25,6 @@
 
 /** A local file to be transferred via the [EMAIL PROTECTED] 
MapOutputProtocol}. */ 
 public class MapOutputFile implements Writable {
-  private static final String LOCAL_DIR = JobConf.getLocalDir().toString();
-
   private String mapTaskId;
   private String reduceTaskId;
   private int partition;
@@ -46,23 +44,23 @@
    * @param mapTaskId a map task id
    * @param partition a reduce partition
    */
-  public static File getOutputFile(String mapTaskId, int partition) {
-    File taskDir = new File(LOCAL_DIR, mapTaskId);
-    return new File(taskDir, "part-"+partition+".out");
+  public static File getOutputFile(String mapTaskId, int partition)
+    throws IOException {
+    return JobConf.getLocalFile(mapTaskId, "part-"+partition+".out");
   }
 
   /** Create a local reduce input file name.
    * @param mapTaskId a map task id
    * @param reduceTaskId a reduce task id
    */
-  public static File getInputFile(String mapTaskId, String reduceTaskId) {
-    File taskDir = new File(LOCAL_DIR, reduceTaskId);
-    return new File(taskDir, mapTaskId+".out");
+  public static File getInputFile(String mapTaskId, String reduceTaskId)
+    throws IOException {
+    return JobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
   }
 
   /** Removes all of the files related to a task. */
   public static void removeAll(String taskId) throws IOException {
-    FileUtil.fullyDelete(new File(LOCAL_DIR, taskId));
+    JobConf.deleteLocalFiles(taskId);
   }
 
   /** 
@@ -70,7 +68,7 @@
    * startup, to remove any leftovers from previous run.
    */
   public static void cleanupStorage() throws IOException {
-    FileUtil.fullyDelete(new File(LOCAL_DIR));
+    JobConf.deleteLocalFiles();
   }
 
   /** Construct a file for transfer. */

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java 
Tue Oct  4 15:01:28 2005
@@ -27,8 +27,6 @@
 
 /** A Reduce task. */
 public class ReduceTask extends Task {
-  private static final String LOCAL_DIR = JobConf.getLocalDir().toString();
-
   private String[] mapTaskIds;
   private int partition;
   private boolean sortComplete;
@@ -164,8 +162,7 @@
     copyPhase.complete();                         // copy is already complete
 
     // open a file to collect map output
-    File taskDir = new File(LOCAL_DIR, getTaskId());
-    String file = new File(taskDir, "all.in").toString();
+    String file = job.getLocalFile(getTaskId(), "all.1").toString();
     SequenceFile.Writer writer =
       new SequenceFile.Writer(lfs, file, keyClass, valueClass);
     try {
@@ -221,7 +218,7 @@
       };
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
 
-    String sortedFile = file+".sorted";
+    String sortedFile = job.getLocalFile(getTaskId(), "all.2").toString();
 
     WritableComparator comparator = job.getOutputKeyComparator();
     

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=294929&r1=294928&r2=294929&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java 
Tue Oct  4 15:01:28 2005
@@ -63,7 +63,7 @@
 
     static Random r = new Random();
     NutchFileSystem fs = null;
-    File localDir = null;
+    static final String SUBDIR = "taskTracker";
 
     /**
      * Start with the local machine name, and the default JobTracker
@@ -77,7 +77,6 @@
      */
     public TaskTracker(InetSocketAddress jobTrackAddr) throws IOException {
         this.jobTrackAddr = jobTrackAddr;
-        this.localDir = new File(JobConf.getLocalDir(), "tracker");
         initialize();
     }
 
@@ -90,10 +89,7 @@
         this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
         this.localHostname = InetAddress.getLocalHost().getHostName();
 
-        if (localDir.exists()) {
-            FileUtil.fullyDelete(localDir);
-        }
-        localDir.mkdirs();
+        JobConf.deleteLocalFiles(SUBDIR);
 
         // Clear out state tables
         this.tasks = new TreeMap();
@@ -296,7 +292,6 @@
     ///////////////////////////////////////////////////////
     class TaskInProgress {
         Task task;
-        File localTaskDir;
         float progress;
         int runstate;
         String stateString = "";
@@ -311,11 +306,7 @@
         public TaskInProgress(Task task) throws IOException {
             this.task = task;
             this.lastProgressReport = System.currentTimeMillis();
-            this.localTaskDir = new File(localDir, task.getTaskId());
-            if (localTaskDir.exists()) {
-                FileUtil.fullyDelete(localTaskDir);
-            }
-            this.localTaskDir.mkdirs();
+            JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
             localizeTask(task);
         }
 
@@ -324,8 +315,10 @@
          * So here, edit the Task's fields appropriately.
          */
         void localizeTask(Task t) throws IOException {
-            File localJobFile = new File(localTaskDir, "job.xml");
-            File localJarFile = new File(localTaskDir, "job.jar");
+            File localJobFile =
+              JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), 
"job.xml");
+            File localJarFile =
+              JobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), 
"job.jar");
 
             String jobFile = t.getJobFile();
             fs.copyToLocalFile(new File(jobFile), localJobFile);
@@ -487,7 +480,7 @@
                 runner.close();
             } catch (IOException ie) {
             }
-            FileUtil.fullyDelete(localTaskDir);
+            JobConf.deleteLocalFiles(SUBDIR+File.separator+task.getTaskId());
         }
     }
 


Reply via email to