Author: cutting
Date: Thu Oct 13 10:59:30 2005
New Revision: 320835

URL: http://svn.apache.org/viewcvs?rev=320835&view=rev
Log:
Store checksums for all files written and verify them on read.  CRCs are stored 
for every 512 bytes of data, so that randomly accessed data may be verified.  
Errors are reported to the filesystem implementation.  Local file errors cause 
files to be moved to a bad file directory, so that bad disk areas are not 
reused.  NDFS file errors should cause blocks to be moved to a bad block 
directory on the datanode, forcing the use of replicas of the bad blocks with 
no loss of data.  This is not yet implemented for NDFS.

Modified:
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/DistributedWebDBWriter.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/FileUtil.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/LocalFileSystem.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NDFSFileSystem.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataOutputStream.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSInputStream.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NutchFileSystem.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DF.java
    
lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/DistributedWebDBWriter.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/DistributedWebDBWriter.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/DistributedWebDBWriter.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/DistributedWebDBWriter.java
 Thu Oct 13 10:59:30 2005
@@ -1656,7 +1656,7 @@
         }
 
         // Bump number by 1.
-        DataOutputStream out = new DataOutputStream(nfs.create(openCounter, 
true));
+        DataOutputStream out = nfs.create(openCounter);
         try {
             out.write(OPEN_COUNTER_VERSION);
             out.writeInt(numOpens + 1);
@@ -1793,7 +1793,7 @@
         // 7. Finally, write out the total num of pages and links
         //
         File sectionStats = new File(newSectionDir, STATS_FILE);
-        DataOutputStream out = new DataOutputStream(nfs.create(sectionStats, 
true));
+        DataOutputStream out = nfs.create(sectionStats);
         try {
             //
             // These counts are guaranteed to be correct; they're
@@ -1854,7 +1854,7 @@
         }
         
         // Bump that number by 1.
-        out = new DataOutputStream(nfs.create(closeCounter, true));
+        out = nfs.create(closeCounter);
         try {
             out.write(CLOSE_COUNTER_VERSION);
             out.writeInt(numCloses + 1);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/FileUtil.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/FileUtil.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/FileUtil.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/FileUtil.java Thu 
Oct 13 10:59:30 2005
@@ -54,9 +54,9 @@
         }
 
         if (nfs.isFile(src)) {
-            DataInputStream in = new DataInputStream(nfs.open(src));
+            NFSInputStream in = nfs.openRaw(src);
             try {
-                DataOutputStream out = new DataOutputStream(nfs.create(dst));
+                NFSOutputStream out = nfs.createRaw(dst, true);
                 byte buf[] = new 
byte[NutchConf.get().getInt("io.file.buffer.size", 4096)];
                 try {
                     int readBytes = in.read(buf);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/LocalFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/LocalFileSystem.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/LocalFileSystem.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/LocalFileSystem.java 
Thu Oct 13 10:59:30 2005
@@ -21,6 +21,7 @@
 import java.nio.channels.*;
 
 import org.apache.nutch.ndfs.NDFSFile;
+import org.apache.nutch.ndfs.DF;
 import org.apache.nutch.ndfs.NDFSFileInfo;
 import org.apache.nutch.io.UTF8;
 
@@ -88,7 +89,7 @@
     /**
      * Open the file at f
      */
-    public NFSInputStream open(File f) throws IOException {
+    public NFSInputStream openRaw(File f) throws IOException {
         if (! f.exists()) {
             throw new FileNotFoundException(f.toString());
         }
@@ -121,7 +122,8 @@
       public void write(int b) throws IOException { fos.write(b); }
     }
 
-    public NFSOutputStream create(File f, boolean overwrite) throws 
IOException {
+    public NFSOutputStream createRaw(File f, boolean overwrite)
+      throws IOException {
         if (f.exists() && ! overwrite) {
             throw new IOException("File already exists:"+f);
         }
@@ -135,7 +137,7 @@
     /**
      * Rename files/dirs
      */
-    public boolean rename(File src, File dst) throws IOException {
+    public boolean renameRaw(File src, File dst) throws IOException {
         if (useCopyForRename) {
             FileUtil.copyContents(this, src, dst, true);
             return fullyDelete(src);
@@ -145,7 +147,7 @@
     /**
      * Get rid of File f, whether a true file or dir.
      */
-    public boolean delete(File f) throws IOException {
+    public boolean deleteRaw(File f) throws IOException {
         if (f.isFile()) {
             return f.delete();
         } else return fullyDelete(f);
@@ -171,7 +173,7 @@
 
     /**
      */
-    public File[] listFiles(File f) throws IOException {
+    public File[] listFilesRaw(File f) throws IOException {
         File[] files = f.listFiles();
         if (files == null) return null;
         // 20041022, xing, Watch out here:
@@ -337,4 +339,40 @@
         }
         return dir.delete();
     }
+
+    /** Moves files to a bad file directory on the same device, so that their
+     * storage will not be reused. */
+    public void reportChecksumFailure(File f, NFSInputStream in,
+                                      long start, long length, int crc) {
+      try {
+        // canonicalize f   
+        f = f.getCanonicalFile();
+      
+        // find highest writable parent dir of f on the same device
+        String device = new DF(f.toString()).getMount();
+        File parent = f.getParentFile();
+        File dir;
+        do {
+          dir = parent;
+          parent = parent.getParentFile();
+        } while (parent.canWrite() && parent.toString().startsWith(device));
+
+        // move the file there
+        File badDir = new File(dir, "bad_files");
+        badDir.mkdirs();
+        String suffix = "." + new Random().nextInt();
+        File badFile = new File(badDir,f.getName()+suffix);
+        LOG.warning("Moving bad file " + f + " to " + badFile);
+        in.close();                               // close it first
+        f.renameTo(badFile);                      // rename it
+
+        // move checksum file too
+        File checkFile = getChecksumFile(f);
+        checkFile.renameTo(new File(badDir, checkFile.getName()+suffix));
+
+      } catch (IOException e) {
+        LOG.warning("Error moving bad file " + f + ": " + e);
+      }
+    }
+
 }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NDFSFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NDFSFileSystem.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NDFSFileSystem.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NDFSFileSystem.java 
Thu Oct 13 10:59:30 2005
@@ -57,28 +57,26 @@
       return new UTF8(path);
     }
 
-    /**
-     * Open the file at f
-     */
-    public NFSInputStream open(File f) throws IOException {
+    public NFSInputStream openRaw(File f) throws IOException {
       return ndfs.open(getPath(f));
     }
 
-    public NFSOutputStream create(File f, boolean overwrite) throws 
IOException {
+    public NFSOutputStream createRaw(File f, boolean overwrite)
+      throws IOException {
       return ndfs.create(getPath(f), overwrite);
     }
 
     /**
      * Rename files/dirs
      */
-    public boolean rename(File src, File dst) throws IOException {
+    public boolean renameRaw(File src, File dst) throws IOException {
       return ndfs.rename(getPath(src), getPath(dst));
     }
 
     /**
      * Get rid of File f, whether a true file or dir.
      */
-    public boolean delete(File f) throws IOException {
+    public boolean deleteRaw(File f) throws IOException {
         return ndfs.delete(getPath(f));
     }
 
@@ -103,7 +101,7 @@
 
     /**
      */
-    public File[] listFiles(File f) throws IOException {
+    public File[] listFilesRaw(File f) throws IOException {
         NDFSFileInfo info[] = ndfs.listFiles(getPath(f));
         if (info == null) {
             return new File[0];
@@ -217,8 +215,7 @@
             byte buf[] = new 
byte[NutchConf.get().getInt("io.file.buffer.size", 4096)];
             InputStream in = open(src);
             try {
-                dst.getParentFile().mkdirs();
-                OutputStream out = new BufferedOutputStream(new 
FileOutputStream(dst));
+                OutputStream out = 
NutchFileSystem.getNamed("local").create(dst);
                 try {
                     int bytesRead = in.read(buf);
                     while (bytesRead >= 0) {
@@ -302,5 +299,16 @@
         path.append(l.get(i));
       }
       return path.toString();
+    }
+
+    public void reportChecksumFailure(File f, NFSInputStream in,
+                                      long start, long length, int crc) {
+      
+      // ignore for now, causing task to fail, and hope that when task is
+      // retried it gets a different copy of the block that is not corrupt.
+
+      // FIXME: we should move the bad block(s) involved to a bad block
+      // directory on their datanode, and then re-replicate the blocks, so that
+      // no data is lost. a task may fail, but on retry it should succeed.
     }
 }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
 Thu Oct 13 10:59:30 2005
@@ -16,19 +16,134 @@
 package org.apache.nutch.fs;
 
 import java.io.*;
-import org.apache.nutch.util.NutchConf;
+import java.util.Arrays;
+import java.util.logging.*;
+import java.util.zip.*;
+import org.apache.nutch.util.*;
 
 /** Utility that wraps a [EMAIL PROTECTED] NFSInputStream} in a [EMAIL 
PROTECTED] DataInputStream}
  * and buffers input through a [EMAIL PROTECTED] BufferedInputStream}. */
 public class NFSDataInputStream extends DataInputStream {
+  private static final Logger LOG =
+    LogFormatter.getLogger("org.apache.nutch.fs.DataInputStream");
+
+  private static final byte[] VERSION = NFSDataOutputStream.CHECKSUM_VERSION;
+  private static final int HEADER_LENGTH = 8;
+  
+  private int bytesPerSum = 1;
   
+  /** Verify that data matches checksums. */
+  private class Checker extends FilterInputStream implements Seekable {
+    private NutchFileSystem fs;
+    private File file;
+    private NFSDataInputStream sums;
+    private Checksum sum = new CRC32();
+    private int inSum;
+
+    public Checker(NutchFileSystem fs, File file)
+      throws IOException {
+      super(fs.openRaw(file));
+      
+      this.fs = fs;
+      this.file = file;
+      File sumFile = fs.getChecksumFile(file);
+      try {
+        this.sums = new NFSDataInputStream(fs.openRaw(sumFile));
+        byte[] version = new byte[VERSION.length];
+        sums.readFully(version);
+        if (!Arrays.equals(version, VERSION))
+          throw new IOException("Not a checksum file: "+sumFile);
+        bytesPerSum = sums.readInt();
+      } catch (IOException e) {
+        LOG.warning("Problem opening checksum file: "+e+". Ignoring.");
+        stopSumming();
+      }
+    }
+
+    public void seek(long desired) throws IOException {
+      ((Seekable)in).seek(desired);
+      if (sums != null) {
+        if (desired % bytesPerSum != 0)
+          throw new IOException("Seek to non-checksummed position.");
+        try {
+          sums.seek(HEADER_LENGTH + 4*(desired/bytesPerSum));
+        } catch (IOException e) {
+          LOG.warning("Problem seeking checksum file: "+e+". Ignoring.");
+          stopSumming();
+        }
+        sum.reset();
+        inSum = 0;
+      }
+    }
+    
+    public int read(byte b[], int off, int len) throws IOException {
+      int read = in.read(b, off, len);
+
+      if (sums != null) {
+        int summed = 0;
+        while (summed < read) {
+          
+          int goal = bytesPerSum - inSum;
+          int inBuf = read - summed;
+          int toSum = inBuf <= goal ? inBuf : goal;
+          
+          sum.update(b, off+summed, toSum);
+          summed += toSum;
+          
+          inSum += toSum;
+          if (inSum == bytesPerSum) {
+            verifySum(read-(summed-bytesPerSum));
+          }
+        }
+      }
+        
+      return read;
+    }
+
+    private void verifySum(int delta) throws IOException {
+      int crc;
+      try {
+        crc = sums.readInt();
+      } catch (IOException e) {
+        LOG.warning("Problem reading checksum file: "+e+". Ignoring.");
+        stopSumming();
+        return;
+      }
+      if (crc != (int)sum.getValue()) {
+        fs.reportChecksumFailure(file, (NFSInputStream)in,
+                                 getPos()-delta, bytesPerSum, crc);
+        throw new IOException("Checksum error: "+file);
+      }
+      sum.reset();
+      inSum = 0;
+    }
+
+    public long getPos() throws IOException {
+      return ((NFSInputStream)in).getPos();
+    }
+
+    public void close() throws IOException {
+      super.close();
+      stopSumming();
+    }
+
+    private void stopSumming() {
+      if (sums != null) {
+        try {
+          sums.close();
+        } catch (IOException f) {}
+        sums = null;
+        bytesPerSum = 1;
+      }
+    }
+  }
+
   /** Cache the file position.  This improves performance significantly.*/
   private static class PositionCache extends FilterInputStream {
     long position;
 
-    public PositionCache(NFSInputStream in) throws IOException {
+    public PositionCache(InputStream in) throws IOException {
       super(in);
-      this.position = in.getPos();
     }
 
     // This is the only read() method called by BufferedInputStream, so we trap
@@ -40,7 +155,7 @@
     }
 
     public void seek(long desired) throws IOException {
-      ((NFSInputStream)in).seek(desired);         // seek underlying stream
+      ((Seekable)in).seek(desired);               // seek underlying stream
       position = desired;                         // update position
     }
       
@@ -51,25 +166,35 @@
   }
 
   /** Buffer input.  This improves performance significantly.*/
-  private static class Buffer extends BufferedInputStream {
-    public Buffer(PositionCache in, int bufferSize) throws IOException {
+  private class Buffer extends BufferedInputStream {
+    public Buffer(PositionCache in, int bufferSize)
+      throws IOException {
       super(in, bufferSize);
     }
 
     public void seek(long desired) throws IOException {
-      long current = getPos();
-      long start = (current - this.pos);
-      if (desired >= start && desired < start + this.count) {
-        this.pos += (desired - current);          // can position within buffer
+      long end = ((PositionCache)in).getPos();
+      long start = end - this.count;
+      if (desired >= start && desired < end) {
+        this.pos = (int)(desired - start);        // can position within buffer
       } else {
         this.count = 0;                           // invalidate buffer
         this.pos = 0;
 
-        ((PositionCache)in).seek(desired);        // seek underlying stream
+        long delta = desired % bytesPerSum;
+        
+        // seek to last checksummed point, if any
+        ((PositionCache)in).seek(desired - delta);
+
+        // scan to desired position
+        for (int i = 0; i < delta; i++) {
+          read();
+        }
       }
+
     }
       
-    public long getPos() throws IOException { // adjust for buffer
+    public long getPos() throws IOException {     // adjust for buffer
       return ((PositionCache)in).getPos() - (this.count - this.pos);
     }
 
@@ -82,15 +207,27 @@
 
 }
 
+  public NFSDataInputStream(NutchFileSystem fs, File file) throws IOException {
+    this(fs, file, NutchConf.get().getInt("io.file.buffer.size", 4096));
+  }
+
+  public NFSDataInputStream(NutchFileSystem fs, File file, int bufferSize)
+    throws IOException {
+    super(null);
+    this.in = new Buffer(new PositionCache(new Checker(fs, file)), bufferSize);
+  }
+    
+  /** Construct without checksums. */
   public NFSDataInputStream(NFSInputStream in) throws IOException {
     this(in, NutchConf.get().getInt("io.file.buffer.size", 4096));
   }
-
+  /** Construct without checksums. */
   public NFSDataInputStream(NFSInputStream in, int bufferSize)
     throws IOException {
-    super(new Buffer(new PositionCache(in), bufferSize));
+    super(null);
+    this.in = new Buffer(new PositionCache(in), bufferSize);
   }
-    
+  
   public void seek(long desired) throws IOException {
     ((Buffer)in).seek(desired);
   }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataOutputStream.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataOutputStream.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataOutputStream.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataOutputStream.java
 Thu Oct 13 10:59:30 2005
@@ -16,18 +16,78 @@
 package org.apache.nutch.fs;
 
 import java.io.*;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 import org.apache.nutch.util.NutchConf;
 
-/** Utility that wraps a [EMAIL PROTECTED] NFSOutputStream} in a [EMAIL 
PROTECTED] DataOutputStream}
- * and buffers output through a [EMAIL PROTECTED] BufferedOutputStream}. */
+/** Utility that wraps a [EMAIL PROTECTED] NFSOutputStream} in a [EMAIL 
PROTECTED] DataOutputStream},
+ * buffers output through a [EMAIL PROTECTED] BufferedOutputStream} and 
creates a checksum
+ * file. */
 public class NFSDataOutputStream extends DataOutputStream {
+  public static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
   
+  /** Store checksums for data. */
+  private static class Summer extends FilterOutputStream {
+
+    private final int bytesPerSum
+      = NutchConf.get().getInt("io.bytes.per.checksum", 512);
+
+    private NFSDataOutputStream sums;
+    private Checksum sum = new CRC32();
+    private int inSum;
+
+    public Summer(NutchFileSystem fs, File file, boolean overwrite)
+      throws IOException {
+      super(fs.createRaw(file, overwrite));
+
+      this.sums =
+        new NFSDataOutputStream(fs.createRaw(fs.getChecksumFile(file), true));
+
+      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
+      sums.writeInt(bytesPerSum);
+    }
+
+    public void write(byte b[], int off, int len) throws IOException {
+      int summed = 0;
+      while (summed < len) {
+
+        int goal = bytesPerSum - inSum;
+        int inBuf = len - summed;
+        int toSum = inBuf <= goal ? inBuf : goal;
+
+        sum.update(b, off+summed, toSum);
+        summed += toSum;
+
+        inSum += toSum;
+        if (inSum == bytesPerSum) {
+          writeSum();
+        }
+      }
+
+      out.write(b, off, len);
+    }
+
+    private void writeSum() throws IOException {
+      if (inSum != 0) {
+        sums.writeInt((int)sum.getValue());
+        sum.reset();
+        inSum = 0;
+      }
+    }
+
+    public void close() throws IOException {
+      writeSum();
+      sums.close();
+      super.close();
+    }
+
+  }
+
   private static class PositionCache extends FilterOutputStream {
     long position;
 
-    public PositionCache(NFSOutputStream out) throws IOException {
+    public PositionCache(OutputStream out) throws IOException {
       super(out);
-      this.position = out.getPos();
     }
 
     // This is the only write() method called by BufferedOutputStream, so we
@@ -44,7 +104,7 @@
   }
 
   private static class Buffer extends BufferedOutputStream {
-    public Buffer(PositionCache out, int bufferSize) throws IOException {
+    public Buffer(OutputStream out, int bufferSize) throws IOException {
       super(out, bufferSize);
     }
 
@@ -63,10 +123,19 @@
 
   }
 
+  public NFSDataOutputStream(NutchFileSystem fs, File file,
+                             boolean overwrite, int bufferSize)
+    throws IOException {
+    super(new Buffer(new PositionCache(new Summer(fs, file, overwrite)),
+                     bufferSize));
+  }
+
+  /** Construct without checksums. */
   public NFSDataOutputStream(NFSOutputStream out) throws IOException {
     this(out, NutchConf.get().getInt("io.file.buffer.size", 4096));
   }
 
+  /** Construct without checksums. */
   public NFSDataOutputStream(NFSOutputStream out, int bufferSize)
     throws IOException {
     super(new Buffer(new PositionCache(out), bufferSize));

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSInputStream.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSInputStream.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSInputStream.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSInputStream.java 
Thu Oct 13 10:59:30 2005
@@ -23,7 +23,7 @@
  *
  * @author Mike Cafarella
  *****************************************************************/
-public abstract class NFSInputStream extends InputStream {
+public abstract class NFSInputStream extends InputStream implements Seekable {
     /**
      * Seek to the given offset from the start of the file.
      * The next read() will be from that location.  Can't

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NutchFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NutchFileSystem.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NutchFileSystem.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NutchFileSystem.java 
Thu Oct 13 10:59:30 2005
@@ -106,6 +106,16 @@
       return fs;
     }
 
+    /** Return the name of the checksum file associated with a file.*/
+    public static File getChecksumFile(File file) {
+      return new File(file.getParentFile(), "."+file.getName()+".crc");
+    }
+
+    /** Return true iff file is a checksum file name.*/
+    public static boolean isChecksumFile(File file) {
+      String name = file.getName();
+      return name.startsWith(".") && name.endsWith(".crc");
+    }
 
     ///////////////////////////////////////////////////////////////
     // NutchFileSystem
@@ -116,17 +126,48 @@
     }
 
     /**
+     * Opens an NFSDataInputStream for the indicated File.
+     */
+    public NFSDataInputStream open(File f) throws IOException {
+      return open(f, NutchConf.get().getInt("io.file.buffer.size", 4096));
+    }
+
+    /**
+     * Opens an NFSDataInputStream at the indicated File.
+     * @param f the file name to open
+     * @param overwrite if a file with this name already exists, then if true,
+     *   the file will be overwritten, and if false an error will be thrown.
+     * @param bufferSize the size of the buffer to be used.
+     */
+    public NFSDataInputStream open(File f, int bufferSize) throws IOException {
+      return new NFSDataInputStream(this, f, bufferSize);
+    }
+
+    /**
      * Opens an InputStream for the indicated File, whether local
      * or via NDFS.
      */
-    public abstract NFSInputStream open(File f) throws IOException;
+    public abstract NFSInputStream openRaw(File f) throws IOException;
 
     /**
-     * Opens an OutputStream at the indicated File.
+     * Opens an NFSDataOutputStream at the indicated File.
      * Files are overwritten by default.
      */
-    public NFSOutputStream create(File f) throws IOException {
-        return create(f, true);
+    public NFSDataOutputStream create(File f) throws IOException {
+      return create(f, true,
+                    NutchConf.get().getInt("io.file.buffer.size", 4096));
+    }
+
+    /**
+     * Opens an NFSDataOutputStream at the indicated File.
+     * @param f the file name to open
+     * @param overwrite if a file with this name already exists, then if true,
+     *   the file will be overwritten, and if false an error will be thrown.
+     * @param bufferSize the size of the buffer to be used.
+     */
+    public NFSDataOutputStream create(File f, boolean overwrite,
+                                      int bufferSize) throws IOException {
+      return new NFSDataOutputStream(this, f, overwrite, bufferSize);
     }
 
     /** Opens an OutputStream at the indicated File.
@@ -134,7 +175,8 @@
      * @param overwrite if a file with this name already exists, then if true,
      *   the file will be overwritten, and if false an error will be thrown.
      */
-    public abstract NFSOutputStream create(File f, boolean overwrite) throws 
IOException;
+    public abstract NFSOutputStream createRaw(File f, boolean overwrite)
+      throws IOException;
 
     /**
      * Creates the given File as a brand-new zero-length file.  If
@@ -144,10 +186,10 @@
         if (exists(f)) {
             return false;
         } else {
-            OutputStream out = create(f);
+            OutputStream out = createRaw(f, false);
             try {
             } finally {
-                out.close();
+              out.close();
             }
             return true;
         }
@@ -157,24 +199,52 @@
      * Renames File src to File dst.  Can take place on local fs
      * or remote NDFS.
      */
-    public abstract boolean rename(File src, File dst) throws IOException;
+    public boolean rename(File src, File dst) throws IOException {
+      if (isDirectory(src)) {
+        return renameRaw(src, dst);
+      } else {
+
+        boolean value = renameRaw(src, dst);
+
+        File checkFile = getChecksumFile(src);
+        if (exists(checkFile))
+          renameRaw(checkFile, getChecksumFile(dst)); // try to rename checksum
+
+        return value;
+      }
+      
+    }
 
     /**
-     * Deletes File
+     * Renames File src to File dst.  Can take place on local fs
+     * or remote NDFS.
      */
-    public abstract boolean delete(File f) throws IOException;
+    public abstract boolean renameRaw(File src, File dst) throws IOException;
 
     /**
-     * Check if exists
+     * Deletes File
      */
-    public abstract boolean exists(File f) throws IOException;
+    public boolean delete(File f) throws IOException {
+      if (isDirectory(f)) {
+        return deleteRaw(f);
+      } else {
+        deleteRaw(getChecksumFile(f));            // try to delete checksum
+        return deleteRaw(f);
+      }
+    }
 
     /**
+     * Deletes File
      */
-    public abstract boolean isDirectory(File f) throws IOException;
+    public abstract boolean deleteRaw(File f) throws IOException;
 
     /**
+     * Check if exists
      */
+    public abstract boolean exists(File f) throws IOException;
+
+    public abstract boolean isDirectory(File f) throws IOException;
+
     public boolean isFile(File f) throws IOException {
         if (exists(f) && ! isDirectory(f)) {
             return true;
@@ -183,17 +253,21 @@
         }
     }
     
-    /**
-     */
     public abstract long getLength(File f) throws IOException;
 
-    /**
-     */
-    public abstract File[] listFiles(File f) throws IOException;
+    public File[] listFiles(File f) throws IOException {
+      return listFiles(f, new FileFilter() {
+          public boolean accept(File file) {
+            return !isChecksumFile(file);
+          }
+        });
+    }
+
+    public abstract File[] listFilesRaw(File f) throws IOException;
 
     public File[] listFiles(File f, FileFilter filter) throws IOException {
         Vector results = new Vector();
-        File listing[] = listFiles(f);
+        File listing[] = listFilesRaw(f);
         for (int i = 0; i < listing.length; i++) {
             if (filter.accept(listing[i])) {
                 results.add(listing[i]);
@@ -281,4 +355,17 @@
      * release any held locks.
      */
     public abstract void close() throws IOException;
+
+    /**
+     * Report a checksum error to the file system.
+     * @param f the file name containing the error
+     * @param in the stream open on the file
+     * @param start the position of the beginning of the bad data in the file
+     * @param length the length of the bad data in the file
+     * @param crc the expected CRC32 of the data
+     */
+    public abstract void reportChecksumFailure(File f, NFSInputStream in,
+                                               long start, long length,
+                                               int crc);
+
 }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java
 Thu Oct 13 10:59:30 2005
@@ -150,7 +150,7 @@
 
     /** Shared by clones. */
     private class Descriptor {
-      public NFSInputStream in;
+      public NFSDataInputStream in;
       public long position;                       // cache of in.getPos()
       public Descriptor(File file) throws IOException {
         this.in = fs.open(file);
@@ -209,7 +209,7 @@
   }
 
   private class NdfsIndexOutput extends BufferedIndexOutput {
-    private NFSOutputStream out;
+    private NFSDataOutputStream out;
 
     public NdfsIndexOutput(File path) throws IOException {
       out = fs.create(path);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java 
Thu Oct 13 10:59:30 2005
@@ -91,8 +91,7 @@
       throws IOException {
       this.nfs = nfs;
       this.target = new File(name);
-      init(new NFSDataOutputStream(nfs.create(target)),
-           keyClass, valClass, compress);
+      init(nfs.create(target), keyClass, valClass, compress);
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */
@@ -233,7 +232,7 @@
       this.nfs = nfs;
       this.file = name;
       File file = new File(name);
-      this.in = new NFSDataInputStream(nfs.open(file), bufferSize);
+      this.in = nfs.open(file, bufferSize);
       this.end = nfs.getLength(file);
       init();
     }
@@ -242,7 +241,7 @@
       throws IOException {
       this.nfs = nfs;
       this.file = file;
-      this.in = new NFSDataInputStream(nfs.open(new File(file)), bufferSize);
+      this.in = nfs.open(new File(file), bufferSize);
       seek(start);
       init();
 
@@ -585,7 +584,7 @@
       private void flush(int count, boolean done) throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile+".0";
-          out = new NFSDataOutputStream(nfs.create(new File(outName)));
+          out = nfs.create(new File(outName));
         }
 
         if (!done) {                              // an intermediate file
@@ -683,7 +682,7 @@
           new MergeQueue(factor, last ? outFile : outFile+"."+pass, last);
 
         this.inName = outFile+"."+(pass-1);
-        this.in = new NFSDataInputStream(nfs.open(new File(inName)));
+        this.in = nfs.open(new File(inName));
       }
 
       public void close() throws IOException {
@@ -818,9 +817,7 @@
       public MergeQueue(int size, String outName, boolean done)
         throws IOException {
         initialize(size);
-        this.out =
-          new NFSDataOutputStream(nfs.create(new File(outName)),
-                                  memory/(factor+1));
+        this.out = nfs.create(new File(outName), true, memory/(factor+1));
         this.done = done;
       }
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java 
Thu Oct 13 10:59:30 2005
@@ -246,8 +246,7 @@
         }
 
         // Write job file to JobTracker's fs
-        NFSDataOutputStream out =
-          new NFSDataOutputStream(getFs().create(submitJobFile));
+        NFSDataOutputStream out = getFs().create(submitJobFile);
         try {
           job.write(out);
         } finally {

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
 Thu Oct 13 10:59:30 2005
@@ -77,7 +77,7 @@
           File mapOut = MapOutputFile.getOutputFile(mapId, 0);
           File reduceIn = MapOutputFile.getInputFile(mapId, reduceId);
           reduceIn.getParentFile().mkdirs();
-          if (!mapOut.renameTo(reduceIn))
+          if (!NutchFileSystem.getNamed("local").rename(mapOut, reduceIn))
             throw new IOException("Couldn't rename " + mapOut);
           MapOutputFile.removeAll(mapId);
         }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
 Thu Oct 13 10:59:30 2005
@@ -88,7 +88,7 @@
     // write the length-prefixed file content to the wire
     File file = getOutputFile(mapTaskId, partition);
     out.writeLong(file.length());
-    InputStream in = new FileInputStream(file);
+    NFSDataInputStream in = NutchFileSystem.getNamed("local").open(file);
     try {
       byte[] buffer = new byte[8192];
       int l;
@@ -112,8 +112,7 @@
     long length = in.readLong();
     float progPerByte = 1.0f / length;
     long unread = length;
-    file.getParentFile().mkdirs();                // make directory
-    OutputStream out = new FileOutputStream(file);
+    NFSDataOutputStream out = NutchFileSystem.getNamed("local").create(file);
     try {
       byte[] buffer = new byte[8192];
       while (unread > 0) {

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
 Thu Oct 13 10:59:30 2005
@@ -42,8 +42,7 @@
     final long end = start + split.getLength();
 
     // open the file and seek to the start of the split
-    final NFSDataInputStream in =
-      new NFSDataInputStream(fs.open(split.getFile()));
+    final NFSDataInputStream in = fs.open(split.getFile());
     
     if (start != 0) {
       in.seek(start-1);

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
 Thu Oct 13 10:59:30 2005
@@ -32,8 +32,7 @@
 
     File file = new File(job.getOutputDir(), name);
 
-    final NFSDataOutputStream out =
-      new NFSDataOutputStream(fs.create(file, true));
+    final NFSDataOutputStream out = fs.create(file);
 
     return new RecordWriter() {
         public synchronized void write(WritableComparable key, Writable value)

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DF.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DF.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DF.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DF.java Thu Oct 
13 10:59:30 2005
@@ -25,7 +25,7 @@
 
 /** Filesystem disk space usage statistics.  Uses the unix 'df' program.
  * Tested on Linux, FreeBSD and Cygwin. */
-class DF {
+public class DF {
   private String filesystem;
   private long capacity;
   private long used;

Modified: 
lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java?rev=320835&r1=320834&r2=320835&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java
 (original)
+++ 
lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java
 Thu Oct 13 10:59:30 2005
@@ -307,8 +307,7 @@
 
       reporter.setStatus("opening " + name);
 
-      NFSDataInputStream in =
-        new NFSDataInputStream(fs.open(new File(DATA_DIR, name)));
+      NFSDataInputStream in = fs.open(new File(DATA_DIR, name));
         
       try {
         for (int i = 0; i < SEEKS_PER_FILE; i++) {


Reply via email to