http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 3ef6390..cbbafc3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -17,23 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.util.LightWeightResizableGSet;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class is used by datanodes to maintain meta data of its replicas.
  * It provides a general interface for meta information of a replica.
@@ -42,81 +39,26 @@ import com.google.common.annotations.VisibleForTesting;
 abstract public class ReplicaInfo extends Block
     implements Replica, LightWeightResizableGSet.LinkedElement {
 
-  /** For implementing {@link LightWeightResizableGSet.LinkedElement} 
interface */
+  /** For implementing {@link LightWeightResizableGSet.LinkedElement}. */
   private LightWeightResizableGSet.LinkedElement next;
 
-  /** volume where the replica belongs */
+  /** volume where the replica belongs. */
   private FsVolumeSpi volume;
-  
-  /** directory where block & meta files belong */
-  
-  /**
-   * Base directory containing numerically-identified sub directories and
-   * possibly blocks.
-   */
-  private File baseDir;
-  
-  /**
-   * Whether or not this replica's parent directory includes subdirs, in which
-   * case we can generate them based on the replica's block ID
-   */
-  private boolean hasSubdirs;
-  
-  private static final Map<String, File> internedBaseDirs = new 
HashMap<String, File>();
 
   /**
-   * Constructor
-   * @param block a block
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   */
-  ReplicaInfo(Block block, FsVolumeSpi vol, File dir) {
-    this(block.getBlockId(), block.getNumBytes(), 
-        block.getGenerationStamp(), vol, dir);
-  }
-  
-  /**
-   * Constructor
-   * @param blockId block id
-   * @param len replica length
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   */
-  ReplicaInfo(long blockId, long len, long genStamp,
-      FsVolumeSpi vol, File dir) {
+  * Constructor
+  * @param vol volume where replica is located
+  * @param blockId block id
+  * @param len replica length
+  * @param genStamp replica generation stamp
+  */
+  ReplicaInfo(FsVolumeSpi vol, long blockId, long len, long genStamp) {
     super(blockId, len, genStamp);
     this.volume = vol;
-    setDirInternal(dir);
-  }
-
-  /**
-   * Copy constructor.
-   * @param from where to copy from
-   */
-  ReplicaInfo(ReplicaInfo from) {
-    this(from, from.getVolume(), from.getDir());
-  }
-  
-  /**
-   * Get the full path of this replica's data file
-   * @return the full path of this replica's data file
-   */
-  public File getBlockFile() {
-    return new File(getDir(), getBlockName());
   }
   
   /**
-   * Get the full path of this replica's meta file
-   * @return the full path of this replica's meta file
-   */
-  public File getMetaFile() {
-    return new File(getDir(),
-        DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
-  }
-  
-  /**
-   * Get the volume where this replica is located on disk
+   * Get the volume where this replica is located on disk.
    * @return the volume where this replica is located on disk
    */
   public FsVolumeSpi getVolume() {
@@ -124,7 +66,7 @@ abstract public class ReplicaInfo extends Block
   }
   
   /**
-   * Set the volume where this replica is located on disk
+   * Set the volume where this replica is located on disk.
    */
   void setVolume(FsVolumeSpi vol) {
     this.volume = vol;
@@ -137,156 +79,182 @@ abstract public class ReplicaInfo extends Block
   public String getStorageUuid() {
     return volume.getStorageID();
   }
-  
+
   /**
-   * Return the parent directory path where this replica is located
-   * @return the parent directory path where this replica is located
+   * Number of bytes reserved for this replica on disk.
    */
-  File getDir() {
-    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
-        getBlockId()) : baseDir;
+  public long getBytesReserved() {
+    return 0;
   }
 
   /**
-   * Set the parent directory where this replica is located
-   * @param dir the parent directory where the replica is located
+   * Get the {@code URI} for where the data of this replica is stored.
+   * @return {@code URI} for the location of replica data.
    */
-  public void setDir(File dir) {
-    setDirInternal(dir);
-  }
+  abstract public URI getBlockURI();
 
-  private void setDirInternal(File dir) {
-    if (dir == null) {
-      baseDir = null;
-      return;
-    }
-
-    ReplicaDirInfo dirInfo = parseBaseDir(dir);
-    this.hasSubdirs = dirInfo.hasSubidrs;
-    
-    synchronized (internedBaseDirs) {
-      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
-        // Create a new String path of this file and make a brand new File 
object
-        // to guarantee we drop the reference to the underlying char[] storage.
-        File baseDir = new File(dirInfo.baseDirPath);
-        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
-      }
-      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
-    }
-  }
+  /**
+   * Returns an {@link InputStream} to the replica's data.
+   * @param seekOffset the offset at which the read is started from.
+   * @return the {@link InputStream} to read the replica data.
+   * @throws IOException if an error occurs in opening a stream to the data.
+   */
+  abstract public InputStream getDataInputStream(long seekOffset)
+      throws IOException;
 
-  @VisibleForTesting
-  public static class ReplicaDirInfo {
-    public String baseDirPath;
-    public boolean hasSubidrs;
+  /**
+   * Returns an {@link OutputStream} to the replica's data.
+   * @param append indicates if the block should be opened for append.
+   * @return the {@link OutputStream} to write to the replica.
+   * @throws IOException if an error occurs in creating an {@link 
OutputStream}.
+   */
+  abstract public OutputStream getDataOutputStream(boolean append)
+      throws IOException;
 
-    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
-      this.baseDirPath = baseDirPath;
-      this.hasSubidrs = hasSubidrs;
-    }
-  }
-  
-  @VisibleForTesting
-  public static ReplicaDirInfo parseBaseDir(File dir) {
-    
-    File currentDir = dir;
-    boolean hasSubdirs = false;
-    while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
-      hasSubdirs = true;
-      currentDir = currentDir.getParentFile();
-    }
-    
-    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
-  }
+  /**
+   * @return true if the replica's data exists.
+   */
+  abstract public boolean blockDataExists();
 
   /**
-   * Number of bytes reserved for this replica on disk.
+   * Used to deletes the replica's block data.
+   *
+   * @return true if the replica's data is successfully deleted.
    */
-  public long getBytesReserved() {
-    return 0;
-  }
+  abstract public boolean deleteBlockData();
 
   /**
-   * Number of bytes originally reserved for this replica. The actual
-   * reservation is adjusted as data is written to disk.
+   * @return the length of the block on storage.
+   */
+  abstract public long getBlockDataLength();
+
+  /**
+   * Get the {@code URI} for where the metadata of this replica is stored.
    *
-   * @return the number of bytes originally reserved for this replica.
+   * @return {@code URI} for the location of replica metadata.
    */
-  public long getOriginalBytesReserved() {
-    return 0;
-  }
+  abstract public URI getMetadataURI();
 
   /**
-   * Copy specified file into a temporary file. Then rename the
-   * temporary file to the original name. This will cause any
-   * hardlinks to the original file to be removed. The temporary
-   * files are created in the same directory. The temporary files will
-   * be recovered (especially on Windows) on datanode restart.
+   * Returns an {@link InputStream} to the replica's metadata.
+   * @param offset the offset at which the read is started from.
+   * @return the {@link LengthInputStream} to read the replica metadata.
+   * @throws IOException
    */
-  private void breakHardlinks(File file, Block b) throws IOException {
-    File tmpFile = DatanodeUtil.createTmpFile(b, 
DatanodeUtil.getUnlinkTmpFile(file));
-    try {
-      FileInputStream in = new FileInputStream(file);
-      try {
-        FileOutputStream out = new FileOutputStream(tmpFile);
-        try {
-          IOUtils.copyBytes(in, out, 16 * 1024);
-        } finally {
-          out.close();
-        }
-      } finally {
-        in.close();
-      }
-      if (file.length() != tmpFile.length()) {
-        throw new IOException("Copy of file " + file + " size " + 
file.length()+
-                              " into file " + tmpFile +
-                              " resulted in a size of " + tmpFile.length());
-      }
-      FileUtil.replaceFile(tmpFile, file);
-    } catch (IOException e) {
-      boolean done = tmpFile.delete();
-      if (!done) {
-        DataNode.LOG.info("detachFile failed to delete temporary file " +
-                          tmpFile);
-      }
-      throw e;
-    }
-  }
+  abstract public LengthInputStream getMetadataInputStream(long offset)
+      throws IOException;
+
+  /**
+   * Returns an {@link OutputStream} to the replica's metadata.
+   * @param append indicates if the block metadata should be opened for append.
+   * @return the {@link OutputStream} to write to the replica's metadata.
+   * @throws IOException if an error occurs in creating an {@link 
OutputStream}.
+   */
+  abstract public OutputStream getMetadataOutputStream(boolean append)
+      throws IOException;
+
+  /**
+   * @return true if the replica's metadata exists.
+   */
+  abstract public boolean metadataExists();
 
   /**
-   * This function "breaks hardlinks" to the current replica file.
+   * Used to deletes the replica's metadata.
    *
-   * When doing a DataNode upgrade, we create a bunch of hardlinks to each 
block
-   * file.  This cleverly ensures that both the old and the new storage
-   * directories can contain the same block file, without using additional 
space
-   * for the data.
+   * @return true if the replica's metadata is successfully deleted.
+   */
+  abstract public boolean deleteMetadata();
+
+  /**
+   * @return the length of the metadata on storage.
+   */
+  abstract public long getMetadataLength();
+
+  /**
+   * Rename the metadata {@link URI} to that referenced by {@code destURI}.
    *
-   * However, when we want to append to the replica file, we need to "break" 
the
-   * hardlink to ensure that the old snapshot continues to contain the old data
-   * length.  If we failed to do that, we could roll back to the previous/
-   * directory during a downgrade, and find that the block contents were longer
-   * than they were at the time of upgrade.
+   * @param destURI the target {@link URI}.
+   * @return true if the rename is successful.
+   * @throws IOException if an exception occurs in the rename.
+   */
+  abstract public boolean renameMeta(URI destURI) throws IOException;
+
+  /**
+   * Rename the data {@link URI} to that referenced by {@code destURI}.
    *
-   * @return true only if data was copied.
+   * @param destURI the target {@link URI}.
+   * @return true if the rename is successful.
+   * @throws IOException if an exception occurs in the rename.
+   */
+  abstract public boolean renameData(URI destURI) throws IOException;
+
+  /**
+   * Update this replica with the {@link StorageLocation} found.
+   * @param replicaLocation the {@link StorageLocation} found for this replica.
+   */
+  abstract public void updateWithReplica(StorageLocation replicaLocation);
+
+  /**
+   * Check whether the block was pinned.
+   * @param localFS the local filesystem to use.
+   * @return true if the block is pinned.
    * @throws IOException
    */
-  public boolean breakHardLinksIfNeeded() throws IOException {
-    File file = getBlockFile();
-    if (file == null || getVolume() == null) {
-      throw new IOException("detachBlock:Block not found. " + this);
-    }
-    File meta = getMetaFile();
-
-    int linkCount = HardLink.getLinkCount(file);
-    if (linkCount > 1) {
-      DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
-          "block " + this);
-      breakHardlinks(file, this);
-    }
-    if (HardLink.getLinkCount(meta) > 1) {
-      breakHardlinks(meta, this);
-    }
-    return true;
+  abstract public boolean getPinning(LocalFileSystem localFS)
+      throws IOException;
+
+  /**
+   * Set a block to be pinned on this datanode so that it cannot be moved
+   * by Balancer/Mover.
+   *
+   * @param localFS the local filesystem to use.
+   * @throws IOException if there is an exception in the pinning.
+   */
+  abstract public void setPinning(LocalFileSystem localFS) throws IOException;
+
+  /**
+   * Bump a replica's generation stamp to a new one.
+   * Its on-disk meta file name is renamed to be the new one too.
+   *
+   * @param newGS new generation stamp
+   * @throws IOException if the change fails
+   */
+  abstract public void bumpReplicaGS(long newGS) throws IOException;
+
+  abstract public ReplicaInfo getOriginalReplica();
+
+  /**
+   * Get the recovery id.
+   * @return the generation stamp that the replica will be bumped to
+   */
+  abstract public long getRecoveryID();
+
+  /**
+   * Set the recovery id.
+   * @param recoveryId the new recoveryId
+   */
+  abstract public void setRecoveryID(long recoveryId);
+
+  abstract public boolean breakHardLinksIfNeeded() throws IOException;
+
+  abstract public ReplicaRecoveryInfo createInfo();
+
+  abstract public int compareWith(ScanInfo info);
+
+  abstract public void truncateBlock(long newLength) throws IOException;
+
+  abstract public void copyMetadata(URI destination) throws IOException;
+
+  abstract public void copyBlockdata(URI destination) throws IOException;
+
+  /**
+   * Number of bytes originally reserved for this replica. The actual
+   * reservation is adjusted as data is written to disk.
+   *
+   * @return the number of bytes originally reserved for this replica.
+   */
+  public long getOriginalBytesReserved() {
+    return 0;
   }
 
   @Override  //Object
@@ -298,7 +266,7 @@ abstract public class ReplicaInfo extends Block
         + "\n  getBytesOnDisk()  = " + getBytesOnDisk()
         + "\n  getVisibleLength()= " + getVisibleLength()
         + "\n  getVolume()       = " + getVolume()
-        + "\n  getBlockFile()    = " + getBlockFile();
+        + "\n  getBlockURI()     = " + getBlockURI();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 558ee21..09140e7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@@ -31,19 +29,19 @@ import 
org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
  * A recovery with higher recovery id preempts recoveries with a lower id.
  *
  */
-public class ReplicaUnderRecovery extends ReplicaInfo {
-  private ReplicaInfo original; // the original replica that needs to be 
recovered
+public class ReplicaUnderRecovery extends LocalReplica {
+  private LocalReplica original; // original replica to be recovered
   private long recoveryId; // recovery id; it is also the generation stamp 
                            // that the replica will be bumped to after recovery
 
   public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
-    super(replica, replica.getVolume(), replica.getDir());
+    super(replica, replica.getVolume(), ((LocalReplica)replica).getDir());
     if ( replica.getState() != ReplicaState.FINALIZED &&
          replica.getState() != ReplicaState.RBW &&
          replica.getState() != ReplicaState.RWR ) {
       throw new IllegalArgumentException("Cannot recover replica: " + replica);
     }
-    this.original = replica;
+    this.original = (LocalReplica) replica;
     this.recoveryId = recoveryId;
   }
 
@@ -53,22 +51,16 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
    */
   public ReplicaUnderRecovery(ReplicaUnderRecovery from) {
     super(from);
-    this.original = from.getOriginalReplica();
+    this.original = (LocalReplica) from.getOriginalReplica();
     this.recoveryId = from.getRecoveryID();
   }
 
-  /** 
-   * Get the recovery id
-   * @return the generation stamp that the replica will be bumped to 
-   */
+  @Override
   public long getRecoveryID() {
     return recoveryId;
   }
 
-  /** 
-   * Set the recovery id
-   * @param recoveryId the new recoveryId
-   */
+  @Override
   public void setRecoveryID(long recoveryId) {
     if (recoveryId > this.recoveryId) {
       this.recoveryId = recoveryId;
@@ -82,6 +74,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
    * Get the original replica that's under recovery
    * @return the original replica under recovery
    */
+  @Override
   public ReplicaInfo getOriginalReplica() {
     return original;
   }
@@ -120,9 +113,9 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
   }
   
   @Override //ReplicaInfo
-  public void setDir(File dir) {
-    super.setDir(dir);
-    original.setDir(dir);
+  public void updateWithReplica(StorageLocation replicaLocation) {
+    super.updateWithReplica(replicaLocation);
+    original.updateWithReplica(replicaLocation);
   }
   
   @Override //ReplicaInfo
@@ -148,6 +141,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
         + "\n  original=" + original;
   }
 
+  @Override
   public ReplicaRecoveryInfo createInfo() {
     return new ReplicaRecoveryInfo(original.getBlockId(), 
         original.getBytesOnDisk(), original.getGenerationStamp(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
index 220649d..38ef286 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
@@ -22,6 +22,7 @@ import java.io.File;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 
 /**
  * This class represents a replica that is waiting to be recovered.
@@ -32,7 +33,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
  * client continues to write or be recovered as a result of
  * lease recovery.
  */
-public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
+public class ReplicaWaitingToBeRecovered extends LocalReplica {
 
   /**
    * Constructor
@@ -94,4 +95,28 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo 
{
   public String toString() {
     return super.toString();
   }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index acc269a..b75ed5b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -44,9 +44,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
@@ -230,10 +229,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   VolumeFailureSummary getVolumeFailureSummary();
 
   /** @return a list of finalized blocks for the given block pool. */
-  List<FinalizedReplica> getFinalizedBlocks(String bpid);
+  List<ReplicaInfo> getFinalizedBlocks(String bpid);
 
   /** @return a list of finalized blocks for the given block pool. */
-  List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
+  List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage(String bpid);
 
   /**
    * Check whether the in-memory block record matches the block on the disk,
@@ -337,7 +336,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    * @param temporary the temporary replica being converted
    * @return the result RBW
    */
-  ReplicaInPipelineInterface convertTemporaryToRbw(
+  ReplicaInPipeline convertTemporaryToRbw(
       ExtendedBlock temporary) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 1e4e37a..b4384b3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -45,13 +45,13 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
@@ -309,14 +309,14 @@ class BlockPoolSlice {
     return rbwFile;
   }
 
-  File addFinalizedBlock(Block b, File f) throws IOException {
+  File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException {
     File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
     if (!blockDir.exists()) {
       if (!blockDir.mkdirs()) {
         throw new IOException("Failed to mkdirs " + blockDir);
       }
     }
-    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
+    File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, 
b.getGenerationStamp());
     if (dfsUsage instanceof CachingGetSpaceUsed) {
       ((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
@@ -329,16 +329,28 @@ class BlockPoolSlice {
    * Move a persisted replica from lazypersist directory to a subdirectory
    * under finalized.
    */
-  File activateSavedReplica(Block b, File metaFile, File blockFile)
-      throws IOException {
-    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, 
b.getBlockId());
+  ReplicaInfo activateSavedReplica(ReplicaInfo replicaInfo,
+      RamDiskReplica replicaState) throws IOException {
+    File metaFile = replicaState.getSavedMetaFile();
+    File blockFile = replicaState.getSavedBlockFile();
+    final long blockId = replicaInfo.getBlockId();
+    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
     final File targetBlockFile = new File(blockDir, blockFile.getName());
     final File targetMetaFile = new File(blockDir, metaFile.getName());
     FileUtils.moveFile(blockFile, targetBlockFile);
     FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
     FileUtils.moveFile(metaFile, targetMetaFile);
     FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
-    return targetBlockFile;
+
+    ReplicaInfo newReplicaInfo =
+        new ReplicaBuilder(ReplicaState.FINALIZED)
+        .setBlockId(blockId)
+        .setLength(replicaInfo.getBytesOnDisk())
+        .setGenerationStamp(replicaInfo.getGenerationStamp())
+        .setFsVolume(replicaState.getLazyPersistVolume())
+        .setDirectoryToUse(targetBlockFile.getParentFile())
+        .build();
+    return newReplicaInfo;
   }
 
   void checkDirs() throws DiskErrorException {
@@ -461,9 +473,13 @@ class BlockPoolSlice {
     long blockId = block.getBlockId();
     long genStamp = block.getGenerationStamp();
     if (isFinalized) {
-      newReplica = new FinalizedReplica(blockId,
-          block.getNumBytes(), genStamp, volume, DatanodeUtil
-          .idToBlockDir(finalizedDir, blockId));
+      newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
+          .setBlockId(blockId)
+          .setLength(block.getNumBytes())
+          .setGenerationStamp(genStamp)
+          .setFsVolume(volume)
+          .setDirectoryToUse(DatanodeUtil.idToBlockDir(finalizedDir, blockId))
+          .build();
     } else {
       File file = new File(rbwDir, block.getBlockName());
       boolean loadRwr = true;
@@ -477,9 +493,15 @@ class BlockPoolSlice {
           // It didn't expire. Load the replica as a RBW.
           // We don't know the expected block length, so just use 0
           // and don't reserve any more space for writes.
-          newReplica = new ReplicaBeingWritten(blockId,
-              validateIntegrityAndSetLength(file, genStamp),
-              genStamp, volume, file.getParentFile(), null, 0);
+          newReplica = new ReplicaBuilder(ReplicaState.RBW)
+              .setBlockId(blockId)
+              .setLength(validateIntegrityAndSetLength(file, genStamp))
+              .setGenerationStamp(genStamp)
+              .setFsVolume(volume)
+              .setDirectoryToUse(file.getParentFile())
+              .setWriterThread(null)
+              .setBytesToReserve(0)
+              .build();
           loadRwr = false;
         }
         sc.close();
@@ -496,9 +518,13 @@ class BlockPoolSlice {
       }
       // Restart meta doesn't exist or expired.
       if (loadRwr) {
-        newReplica = new ReplicaWaitingToBeRecovered(blockId,
-            validateIntegrityAndSetLength(file, genStamp),
-            genStamp, volume, file.getParentFile());
+        ReplicaBuilder builder = new ReplicaBuilder(ReplicaState.RWR)
+            .setBlockId(blockId)
+            .setLength(validateIntegrityAndSetLength(file, genStamp))
+            .setGenerationStamp(genStamp)
+            .setFsVolume(volume)
+            .setDirectoryToUse(file.getParentFile());
+        newReplica = builder.build();
       }
     }
 
@@ -614,7 +640,7 @@ class BlockPoolSlice {
 
     // it's the same block so don't ever delete it, even if GS or size
     // differs.  caller should keep the one it just discovered on disk
-    if (replica1.getBlockFile().equals(replica2.getBlockFile())) {
+    if (replica1.getBlockURI().equals(replica2.getBlockURI())) {
       return null;
     }
     if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
@@ -641,13 +667,11 @@ class BlockPoolSlice {
 
   private void deleteReplica(final ReplicaInfo replicaToDelete) {
     // Delete the files on disk. Failure here is okay.
-    final File blockFile = replicaToDelete.getBlockFile();
-    if (!blockFile.delete()) {
-      LOG.warn("Failed to delete block file " + blockFile);
+    if (!replicaToDelete.deleteBlockData()) {
+      LOG.warn("Failed to delete block file for replica " + replicaToDelete);
     }
-    final File metaFile = replicaToDelete.getMetaFile();
-    if (!metaFile.delete()) {
-      LOG.warn("Failed to delete meta file " + metaFile);
+    if (!replicaToDelete.deleteMetadata()) {
+      LOG.warn("Failed to delete meta file for replica " + replicaToDelete);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index fdc9f83..c9160cd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.FileDescriptor;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -34,6 +35,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.io.IOUtils;
@@ -211,12 +214,12 @@ class FsDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile,
+  void deleteAsync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete,
       ExtendedBlock block, String trashDirectory) {
     LOG.info("Scheduling " + block.getLocalBlock()
-        + " file " + blockFile + " for deletion");
+        + " replica " + replicaToDelete + " for deletion");
     ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
-        volumeRef, blockFile, metaFile, block, trashDirectory);
+        volumeRef, replicaToDelete, block, trashDirectory);
     execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), 
deletionTask);
   }
   
@@ -227,19 +230,18 @@ class FsDatasetAsyncDiskService {
    *  files are deleted immediately.
    */
   class ReplicaFileDeleteTask implements Runnable {
-    final FsVolumeReference volumeRef;
-    final FsVolumeImpl volume;
-    final File blockFile;
-    final File metaFile;
-    final ExtendedBlock block;
-    final String trashDirectory;
-    
-    ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile,
-        File metaFile, ExtendedBlock block, String trashDirectory) {
+    private final FsVolumeReference volumeRef;
+    private final FsVolumeImpl volume;
+    private final ReplicaInfo replicaToDelete;
+    private final ExtendedBlock block;
+    private final String trashDirectory;
+
+    ReplicaFileDeleteTask(FsVolumeReference volumeRef,
+        ReplicaInfo replicaToDelete, ExtendedBlock block,
+        String trashDirectory) {
       this.volumeRef = volumeRef;
       this.volume = (FsVolumeImpl) volumeRef.getVolume();
-      this.blockFile = blockFile;
-      this.metaFile = metaFile;
+      this.replicaToDelete = replicaToDelete;
       this.block = block;
       this.trashDirectory = trashDirectory;
     }
@@ -248,15 +250,22 @@ class FsDatasetAsyncDiskService {
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
       return "deletion of block " + block.getBlockPoolId() + " "
-          + block.getLocalBlock() + " with block file " + blockFile
-          + " and meta file " + metaFile + " from volume " + volume;
+          + block.getLocalBlock() + " with block file "
+          + replicaToDelete.getBlockURI() + " and meta file "
+          + replicaToDelete.getMetadataURI() + " from volume " + volume;
     }
 
     private boolean deleteFiles() {
-      return blockFile.delete() && (metaFile.delete() || !metaFile.exists());
+      return replicaToDelete.deleteBlockData() &&
+        (replicaToDelete.deleteMetadata() || 
!replicaToDelete.metadataExists());
     }
 
     private boolean moveFiles() {
+      if (trashDirectory == null) {
+        LOG.error("Trash dir for replica " + replicaToDelete + " is null");
+        return false;
+      }
+
       File trashDirFile = new File(trashDirectory);
       if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
         LOG.error("Failed to create trash directory " + trashDirectory);
@@ -264,20 +273,28 @@ class FsDatasetAsyncDiskService {
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Moving files " + blockFile.getName() + " and " +
-            metaFile.getName() + " to trash.");
+        LOG.debug("Moving files " + replicaToDelete.getBlockURI() + " and " +
+            replicaToDelete.getMetadataURI() + " to trash.");
       }
 
-      File newBlockFile = new File(trashDirectory, blockFile.getName());
-      File newMetaFile = new File(trashDirectory, metaFile.getName());
-      return (blockFile.renameTo(newBlockFile) &&
-              metaFile.renameTo(newMetaFile));
+      final String blockName = replicaToDelete.getBlockName();
+      final long genstamp = replicaToDelete.getGenerationStamp();
+      File newBlockFile = new File(trashDirectory, blockName);
+      File newMetaFile = new File(trashDirectory,
+          DatanodeUtil.getMetaName(blockName, genstamp));
+      try {
+        return (replicaToDelete.renameData(newBlockFile.toURI()) &&
+                replicaToDelete.renameMeta(newMetaFile.toURI()));
+      } catch (IOException e) {
+        LOG.error("Error moving files to trash: " + replicaToDelete, e);
+      }
+      return false;
     }
 
     @Override
     public void run() {
-      final long blockLength = blockFile.length();
-      final long metaLength = metaFile.length();
+      final long blockLength = replicaToDelete.getBlockDataLength();
+      final long metaLength = replicaToDelete.getMetadataLength();
       boolean result;
 
       result = (trashDirectory == null) ? deleteFiles() : moveFiles();
@@ -286,7 +303,7 @@ class FsDatasetAsyncDiskService {
         LOG.warn("Unexpected error trying to "
             + (trashDirectory == null ? "delete" : "move")
             + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
-            + " at file " + blockFile + ". Ignored.");
+            + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
       } else {
         if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
           datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
@@ -294,7 +311,7 @@ class FsDatasetAsyncDiskService {
         volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
         volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
         LOG.info("Deleted " + block.getBlockPoolId() + " "
-            + block.getLocalBlock() + " file " + blockFile);
+            + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
       }
       updateDeletedBlockId(block);
       IOUtils.cleanup(null, volumeRef);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to