HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have 
corrupt blocks. Contributed by Jing Zhao and Kai Sasaki.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5bcd26d4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5bcd26d4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5bcd26d4

Branch: refs/heads/HDFS-7285-REBASE
Commit: 5bcd26d439a18294a90a66327d8f1ff697b3a62b
Parents: 99b8427
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Jun 19 14:07:38 2015 -0700
Committer: Vinayakumar B <vinayakum...@apache.org>
Committed: Thu Aug 13 17:13:44 2015 +0530

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      |  15 ++-
 .../hdfs/TestReadStripedFileWithDecoding.java   | 128 +++++++++++++++++--
 3 files changed, 129 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bcd26d4/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index a12f361..2c91dad 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -314,3 +314,6 @@
 
     HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped 
block.
     (Walter Su via jing9)
+
+    HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which 
have
+    corrupt blocks. (jing9 and Kai Sasaki)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bcd26d4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index a7339b7..878e5e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -45,7 +45,6 @@ import static 
org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResu
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.htrace.Span;
@@ -340,7 +339,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   private Callable<Void> readCell(final BlockReader reader,
       final DatanodeInfo datanode, final long currentReaderOffset,
       final long targetReaderOffset, final ByteBufferStrategy strategy,
-      final int targetLength,
+      final int targetLength, final ExtendedBlock currentBlock,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     return new Callable<Void>() {
       @Override
@@ -359,7 +358,8 @@ public class DFSStripedInputStream extends DFSInputStream {
         }
         int result = 0;
         while (result < targetLength) {
-          int ret = readToBuffer(reader, datanode, strategy, 
corruptedBlockMap);
+          int ret = readToBuffer(reader, datanode, strategy, currentBlock,
+              corruptedBlockMap);
           if (ret < 0) {
             throw new IOException("Unexpected EOS from the reader");
           }
@@ -373,21 +373,22 @@ public class DFSStripedInputStream extends DFSInputStream 
{
 
   private int readToBuffer(BlockReader blockReader,
       DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
+      ExtendedBlock currentBlock,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     try {
       return readerStrategy.doRead(blockReader, 0, 0);
     } catch (ChecksumException ce) {
       DFSClient.LOG.warn("Found Checksum error for "
-          + getCurrentBlock() + " from " + currentNode
+          + currentBlock + " from " + currentNode
           + " at " + ce.getPos());
       // we want to remember which block replicas we have tried
-      addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+      addIntoCorruptedBlockMap(currentBlock, currentNode,
           corruptedBlockMap);
       throw ce;
     } catch (IOException e) {
       DFSClient.LOG.warn("Exception while reading from "
-          + getCurrentBlock() + " of " + src + " from "
+          + currentBlock + " of " + src + " from "
           + currentNode, e);
       throw e;
     }
@@ -768,7 +769,7 @@ public class DFSStripedInputStream extends DFSInputStream {
       Callable<Void> readCallable = readCell(blockReaders[chunkIndex],
           currentNodes[chunkIndex], blockReaderOffsets[chunkIndex],
           alignedStripe.getOffsetInBlock(), strategy,
-          chunk.byteBuffer.remaining(), corruptedBlockMap);
+          chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap);
       Future<Void> request = readingService.submit(readCallable);
       futures.put(request, chunkIndex);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bcd26d4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index 0201d07..3125e2e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -17,35 +17,37 @@
  */
 package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.*;
 
 public class TestReadStripedFileWithDecoding {
+  static final Log LOG = 
LogFactory.getLog(TestReadStripedFileWithDecoding.class);
 
   private MiniDFSCluster cluster;
-  private FileSystem fs;
+  private DistributedFileSystem fs;
 
   @Before
   public void setup() throws IOException {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
+        .numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().createErasureCodingZone("/",
         null, cellSize);
     fs = cluster.getFileSystem();
@@ -73,6 +75,112 @@ public class TestReadStripedFileWithDecoding {
     testReadWithDNFailure("/foo", cellSize * dataBlocks, 0);
   }
 
+  /**
+   * Delete a data block before reading. Verify the decoding works correctly.
+   */
+  @Test
+  public void testReadCorruptedData() throws IOException {
+    // create file
+    final Path file = new Path("/partially_deleted");
+    final int length = cellSize * dataBlocks * 2;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(fs, file, bytes);
+
+    // corrupt the first data block
+    // find the corresponding data node
+    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+    Assert.assertNotEquals(-1, dnIndex);
+    // find the target block
+    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
+        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        cellSize, dataBlocks, parityBlocks);
+    // find the target block file
+    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
+    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile.exists());
+    // delete the block file
+    LOG.info("Deliberately removing file " + blkFile.getName());
+    Assert.assertTrue("Cannot remove file", blkFile.delete());
+    verifyRead(file, length, bytes);
+  }
+
+  /**
+   * Corrupt the content of the data block before reading.
+   */
+  @Test
+  public void testReadCorruptedData2() throws IOException {
+    // create file
+    final Path file = new Path("/partially_corrupted");
+    final int length = cellSize * dataBlocks * 2;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(fs, file, bytes);
+
+    // corrupt the first data block
+    // find the first data node
+    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+    Assert.assertNotEquals(-1, dnIndex);
+    // find the first data block
+    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
+        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        cellSize, dataBlocks, parityBlocks);
+    // find the first block file
+    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
+    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile.exists());
+    // corrupt the block file
+    LOG.info("Deliberately corrupting file " + blkFile.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile)) {
+      out.write("corruption".getBytes());
+    }
+
+    verifyRead(file, length, bytes);
+  }
+
+  private int findFirstDataNode(Path file, long length) throws IOException {
+    BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
+    String name = (locs[0].getNames())[0];
+    int dnIndex = 0;
+    for (DataNode dn : cluster.getDataNodes()) {
+      int port = dn.getXferPort();
+      if (name.contains(Integer.toString(port))) {
+        return dnIndex;
+      }
+      dnIndex++;
+    }
+    return -1;
+  }
+
+  private void verifyRead(Path file, int length, byte[] expected)
+      throws IOException {
+    // pread
+    try (FSDataInputStream fsdis = fs.open(file)) {
+      byte[] buf = new byte[length];
+      int readLen = fsdis.read(0, buf, 0, buf.length);
+      Assert.assertEquals("The fileSize of file should be the same to write 
size",
+          length, readLen);
+      Assert.assertArrayEquals(expected, buf);
+    }
+
+    // stateful read
+    ByteBuffer result = ByteBuffer.allocate(length);
+    ByteBuffer buf = ByteBuffer.allocate(1024);
+    int readLen = 0;
+    int ret;
+    try (FSDataInputStream in = fs.open(file)) {
+      while ((ret = in.read(buf)) >= 0) {
+        readLen += ret;
+        buf.flip();
+        result.put(buf);
+        buf.clear();
+      }
+    }
+    Assert.assertEquals("The length of file should be the same to write size",
+        length, readLen);
+    Assert.assertArrayEquals(expected, result.array());
+  }
+
   private void testReadWithDNFailure(String file, int fileSize,
       int startOffsetInFile) throws IOException {
     final int failedDNIdx = 2;

Reply via email to