HDFS-10861. Refactor StripeReaders and use ECChunk version decode API. 
Contributed by Sammi Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/734d54c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/734d54c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/734d54c1

Branch: refs/heads/HADOOP-12756
Commit: 734d54c1a8950446e68098f62d8964e02ecc2890
Parents: 2b66d9e
Author: Kai Zheng <kai.zh...@intel.com>
Authored: Wed Sep 21 21:34:48 2016 +0800
Committer: Kai Zheng <kai.zh...@intel.com>
Committed: Wed Sep 21 21:34:48 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/io/ElasticByteBufferPool.java |   2 +-
 .../apache/hadoop/io/erasurecode/ECChunk.java   |  22 +
 .../io/erasurecode/rawcoder/CoderUtil.java      |   3 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  20 +-
 .../hadoop/hdfs/DFSStripedInputStream.java      | 654 +++----------------
 .../hadoop/hdfs/PositionStripeReader.java       | 104 +++
 .../hadoop/hdfs/StatefulStripeReader.java       |  95 +++
 .../org/apache/hadoop/hdfs/StripeReader.java    | 463 +++++++++++++
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 158 ++---
 .../hadoop/hdfs/util/TestStripedBlockUtil.java  |   1 -
 10 files changed, 844 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index c35d608..023f37f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -85,7 +85,7 @@ public final class ElasticByteBufferPool implements 
ByteBufferPool {
   private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
     return direct ? directBuffers : buffers;
   }
-  
+
   @Override
   public synchronized ByteBuffer getBuffer(boolean direct, int length) {
     TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
index cd7c6be..536715b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
@@ -29,6 +29,9 @@ public class ECChunk {
 
   private ByteBuffer chunkBuffer;
 
+  // TODO: should be in a more general flags
+  private boolean allZero = false;
+
   /**
    * Wrapping a ByteBuffer
    * @param buffer buffer to be wrapped by the chunk
@@ -37,6 +40,13 @@ public class ECChunk {
     this.chunkBuffer = buffer;
   }
 
+  public ECChunk(ByteBuffer buffer, int offset, int len) {
+    ByteBuffer tmp = buffer.duplicate();
+    tmp.position(offset);
+    tmp.limit(offset + len);
+    this.chunkBuffer = tmp.slice();
+  }
+
   /**
    * Wrapping a bytes array
    * @param buffer buffer to be wrapped by the chunk
@@ -45,6 +55,18 @@ public class ECChunk {
     this.chunkBuffer = ByteBuffer.wrap(buffer);
   }
 
+  public ECChunk(byte[] buffer, int offset, int len) {
+    this.chunkBuffer = ByteBuffer.wrap(buffer, offset, len);
+  }
+
+  public boolean isAllZero() {
+    return allZero;
+  }
+
+  public void setAllZero(boolean allZero) {
+    this.allZero = allZero;
+  }
+
   /**
    * Convert to ByteBuffer
    * @return ByteBuffer

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
index b22d44f..ef34639 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
@@ -115,6 +115,9 @@ final class CoderUtil {
         buffers[i] = null;
       } else {
         buffers[i] = chunk.getBuffer();
+        if (chunk.isAllZero()) {
+          CoderUtil.resetBuffer(buffers[i], buffers[i].remaining());
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 31fa897..dbffc64 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -240,7 +240,7 @@ public class DFSInputStream extends FSInputStream
       Iterator<LocatedBlock> oldIter = 
locatedBlocks.getLocatedBlocks().iterator();
       Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
       while (oldIter.hasNext() && newIter.hasNext()) {
-        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
+        if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
           throw new IOException("Blocklist for " + src + " has changed!");
         }
       }
@@ -677,8 +677,8 @@ public class DFSInputStream extends FSInputStream
     if (oneByteBuf == null) {
       oneByteBuf = new byte[1];
     }
-    int ret = read( oneByteBuf, 0, 1 );
-    return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
+    int ret = read(oneByteBuf, 0, 1);
+    return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff);
   }
 
   /* This is a used by regular read() and handles ChecksumExceptions.
@@ -702,7 +702,7 @@ public class DFSInputStream extends FSInputStream
       // retry as many times as seekToNewSource allows.
       try {
         return reader.readFromBlock(blockReader, len);
-      } catch ( ChecksumException ce ) {
+      } catch (ChecksumException ce) {
         DFSClient.LOG.warn("Found Checksum error for "
             + getCurrentBlock() + " from " + currentNode
             + " at " + ce.getPos());
@@ -710,7 +710,7 @@ public class DFSInputStream extends FSInputStream
         retryCurrentNode = false;
         // we want to remember which block replicas we have tried
         corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
-      } catch ( IOException e ) {
+      } catch (IOException e) {
         if (!retryCurrentNode) {
           DFSClient.LOG.warn("Exception while reading from "
               + getCurrentBlock() + " of " + src + " from "
@@ -779,7 +779,9 @@ public class DFSInputStream extends FSInputStream
             DFSClient.LOG.warn("DFS Read", e);
           }
           blockEnd = -1;
-          if (currentNode != null) { addToDeadNodes(currentNode); }
+          if (currentNode != null) {
+            addToDeadNodes(currentNode);
+          }
           if (--retries == 0) {
             throw e;
           }
@@ -1397,10 +1399,10 @@ public class DFSInputStream extends FSInputStream
 
   @Override
   public long skip(long n) throws IOException {
-    if ( n > 0 ) {
+    if (n > 0) {
       long curPos = getPos();
       long fileLen = getFileLength();
-      if( n+curPos > fileLen ) {
+      if (n+curPos > fileLen) {
         n = fileLen - curPos;
       }
       seek(curPos+n);
@@ -1550,7 +1552,7 @@ public class DFSInputStream extends FSInputStream
    * Get statistics about the reads which this DFSInputStream has done.
    */
   public ReadStatistics getReadStatistics() {
-    return new ReadStatistics(readStatistics);
+    return readStatistics;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index ccaf6a7..922f74e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -17,24 +17,21 @@
  */
 package org.apache.hadoop.hdfs;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
+import org.apache.hadoop.hdfs.StripeReader.BlockReaderInfo;
+import org.apache.hadoop.hdfs.StripeReader.ReaderRetryPolicy;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
 import org.apache.hadoop.io.ByteBufferPool;
 
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
-import static 
org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
-
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -44,7 +41,6 @@ import 
org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,111 +49,32 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.Collection;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
- * DFSStripedInputStream reads from striped block groups
+ * DFSStripedInputStream reads from striped block groups.
  */
 @InterfaceAudience.Private
 public class DFSStripedInputStream extends DFSInputStream {
 
-  private static class ReaderRetryPolicy {
-    private int fetchEncryptionKeyTimes = 1;
-    private int fetchTokenTimes = 1;
-
-    void refetchEncryptionKey() {
-      fetchEncryptionKeyTimes--;
-    }
-
-    void refetchToken() {
-      fetchTokenTimes--;
-    }
-
-    boolean shouldRefetchEncryptionKey() {
-      return fetchEncryptionKeyTimes > 0;
-    }
-
-    boolean shouldRefetchToken() {
-      return fetchTokenTimes > 0;
-    }
-  }
-
-  /** Used to indicate the buffered data's range in the block group */
-  private static class StripeRange {
-    /** start offset in the block group (inclusive) */
-    final long offsetInBlock;
-    /** length of the stripe range */
-    final long length;
-
-    StripeRange(long offsetInBlock, long length) {
-      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
-      this.offsetInBlock = offsetInBlock;
-      this.length = length;
-    }
-
-    boolean include(long pos) {
-      return pos >= offsetInBlock && pos < offsetInBlock + length;
-    }
-  }
-
-  private static class BlockReaderInfo {
-    final BlockReader reader;
-    final DatanodeInfo datanode;
-    /**
-     * when initializing block readers, their starting offsets are set to the 
same
-     * number: the smallest internal block offsets among all the readers. This 
is
-     * because it is possible that for some internal blocks we have to read
-     * "backwards" for decoding purpose. We thus use this offset array to track
-     * offsets for all the block readers so that we can skip data if necessary.
-     */
-    long blockReaderOffset;
-    /**
-     * We use this field to indicate whether we should use this reader. In case
-     * we hit any issue with this reader, we set this field to true and avoid
-     * using it for the next stripe.
-     */
-    boolean shouldSkip = false;
-
-    BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
-      this.reader = reader;
-      this.datanode = dn;
-      this.blockReaderOffset = offset;
-    }
-
-    void setOffset(long offset) {
-      this.blockReaderOffset = offset;
-    }
-
-    void skip() {
-      this.shouldSkip = true;
-    }
-  }
-
   private static final ByteBufferPool BUFFER_POOL = new 
ElasticByteBufferPool();
-
   private final BlockReaderInfo[] blockReaders;
   private final int cellSize;
   private final short dataBlkNum;
   private final short parityBlkNum;
   private final int groupSize;
-  /** the buffer for a complete stripe */
+  /** the buffer for a complete stripe. */
   private ByteBuffer curStripeBuf;
   private ByteBuffer parityBuf;
   private final ErasureCodingPolicy ecPolicy;
   private final RawErasureDecoder decoder;
 
   /**
-   * indicate the start/end offset of the current buffered stripe in the
-   * block group
+   * Indicate the start/end offset of the current buffered stripe in the
+   * block group.
    */
   private StripeRange curStripeRange;
-  private final CompletionService<Void> readingService;
 
   /**
    * When warning the user of a lost block in striping mode, we remember the
@@ -167,8 +84,8 @@ public class DFSStripedInputStream extends DFSInputStream {
    *
    * To minimize the overhead, we only store the datanodeUuid in this set
    */
-  private final Set<String> warnedNodes = Collections.newSetFromMap(
-      new ConcurrentHashMap<String, Boolean>());
+  private final Set<String> warnedNodes =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   DFSStripedInputStream(DFSClient dfsClient, String src,
       boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
@@ -183,8 +100,6 @@ public class DFSStripedInputStream extends DFSInputStream {
     groupSize = dataBlkNum + parityBlkNum;
     blockReaders = new BlockReaderInfo[groupSize];
     curStripeRange = new StripeRange(0, 0);
-    readingService =
-        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
     ErasureCoderOptions coderOptions = new ErasureCoderOptions(
         dataBlkNum, parityBlkNum);
     decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
@@ -198,7 +113,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     return decoder.preferDirectBuffer();
   }
 
-  private void resetCurStripeBuffer() {
+  void resetCurStripeBuffer() {
     if (curStripeBuf == null) {
       curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
           cellSize * dataBlkNum);
@@ -207,7 +122,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     curStripeRange = new StripeRange(0, 0);
   }
 
-  private ByteBuffer getParityBuffer() {
+  protected ByteBuffer getParityBuffer() {
     if (parityBuf == null) {
       parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
           cellSize * parityBlkNum);
@@ -216,6 +131,29 @@ public class DFSStripedInputStream extends DFSInputStream {
     return parityBuf;
   }
 
+  protected ByteBuffer getCurStripeBuf() {
+    return curStripeBuf;
+  }
+
+  protected String getSrc() {
+    return src;
+  }
+
+  protected DFSClient getDFSClient() {
+    return dfsClient;
+  }
+
+  protected LocatedBlocks getLocatedBlocks() {
+    return locatedBlocks;
+  }
+
+  protected ByteBufferPool getBufferPool() {
+    return BUFFER_POOL;
+  }
+
+  protected ThreadPoolExecutor getStripedReadsThreadPool(){
+    return dfsClient.getStripedReadsThreadPool();
+  }
   /**
    * When seeking into a new block group, create blockReader for each internal
    * block in the group.
@@ -268,7 +206,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     blockEnd = -1;
   }
 
-  private void closeReader(BlockReaderInfo readerInfo) {
+  protected void closeReader(BlockReaderInfo readerInfo) {
     if (readerInfo != null) {
       if (readerInfo.reader != null) {
         try {
@@ -288,6 +226,59 @@ public class DFSStripedInputStream extends DFSInputStream {
     return pos - currentLocatedBlock.getStartOffset();
   }
 
+  boolean createBlockReader(LocatedBlock block, long offsetInBlock,
+      LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
+      int chunkIndex) throws IOException {
+    BlockReader reader = null;
+    final ReaderRetryPolicy retry = new ReaderRetryPolicy();
+    DFSInputStream.DNAddrPair dnInfo =
+        new DFSInputStream.DNAddrPair(null, null, null);
+
+    while (true) {
+      try {
+        // the cached block location might have been re-fetched, so always
+        // get it from cache.
+        block = refreshLocatedBlock(block);
+        targetBlocks[chunkIndex] = block;
+
+        // internal block has one location, just rule out the deadNodes
+        dnInfo = getBestNodeDNAddrPair(block, null);
+        if (dnInfo == null) {
+          break;
+        }
+        reader = getBlockReader(block, offsetInBlock,
+            block.getBlockSize() - offsetInBlock,
+            dnInfo.addr, dnInfo.storageType, dnInfo.info);
+      } catch (IOException e) {
+        if (e instanceof InvalidEncryptionKeyException &&
+            retry.shouldRefetchEncryptionKey()) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+              + "encryption key was invalid when connecting to " + dnInfo.addr
+              + " : " + e);
+          dfsClient.clearDataEncryptionKey();
+          retry.refetchEncryptionKey();
+        } else if (retry.shouldRefetchToken() &&
+            tokenRefetchNeeded(e, dnInfo.addr)) {
+          fetchBlockAt(block.getStartOffset());
+          retry.refetchToken();
+        } else {
+          //TODO: handles connection issues
+          DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
+              "block" + block.getBlock(), e);
+          // re-fetch the block in case the block has been moved
+          fetchBlockAt(block.getStartOffset());
+          addToDeadNodes(dnInfo.info);
+        }
+      }
+      if (reader != null) {
+        readerInfos[chunkIndex] =
+            new BlockReaderInfo(reader, dnInfo.info, offsetInBlock);
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Read a new stripe covering the current position, and store the data in the
    * {@link #curStripeBuf}.
@@ -303,20 +294,20 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
     final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
         - (stripeIndex * stripeLen), stripeLen);
-    StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
-        stripeLimit - stripeBufOffset);
+    StripeRange stripeRange =
+        new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset);
 
     LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
     AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
         cellSize, blockGroup, offsetInBlockGroup,
-        offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
+        offsetInBlockGroup + stripeRange.getLength() - 1, curStripeBuf);
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     // read the whole stripe
     for (AlignedStripe stripe : stripes) {
       // Parse group to get chosen DN location
-      StripeReader sreader = new StatefulStripeReader(readingService, stripe,
-          blks, blockReaders, corruptedBlocks);
+      StripeReader sreader = new StatefulStripeReader(stripe, ecPolicy, blks,
+          blockReaders, corruptedBlocks, decoder, this);
       sreader.readStripe();
     }
     curStripeBuf.position(stripeBufOffset);
@@ -324,69 +315,8 @@ public class DFSStripedInputStream extends DFSInputStream {
     curStripeRange = stripeRange;
   }
 
-  private Callable<Void> readCells(final BlockReader reader,
-      final DatanodeInfo datanode, final long currentReaderOffset,
-      final long targetReaderOffset, final ByteBufferStrategy[] strategies,
-      final ExtendedBlock currentBlock,
-      final CorruptedBlocks corruptedBlocks) {
-    return new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        // reader can be null if getBlockReaderWithRetry failed or
-        // the reader hit exception before
-        if (reader == null) {
-          throw new IOException("The BlockReader is null. " +
-              "The BlockReader creation failed or the reader hit exception.");
-        }
-        Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
-        if (currentReaderOffset < targetReaderOffset) {
-          long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
-          Preconditions.checkState(
-              skipped == targetReaderOffset - currentReaderOffset);
-        }
-        int result = 0;
-        for (ByteBufferStrategy strategy : strategies) {
-          result += readToBuffer(reader, datanode, strategy, currentBlock,
-              corruptedBlocks);
-        }
-        return null;
-      }
-    };
-  }
-
-  private int readToBuffer(BlockReader blockReader,
-      DatanodeInfo currentNode, ByteBufferStrategy strategy,
-      ExtendedBlock currentBlock,
-      CorruptedBlocks corruptedBlocks)
-      throws IOException {
-    final int targetLength = strategy.getTargetLength();
-    int length = 0;
-    try {
-      while (length < targetLength) {
-        int ret = strategy.readFromBlock(blockReader);
-        if (ret < 0) {
-          throw new IOException("Unexpected EOS from the reader");
-        }
-        length += ret;
-      }
-      return length;
-    } catch (ChecksumException ce) {
-      DFSClient.LOG.warn("Found Checksum error for "
-          + currentBlock + " from " + currentNode
-          + " at " + ce.getPos());
-      // we want to remember which block replicas we have tried
-      corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
-      throw ce;
-    } catch (IOException e) {
-      DFSClient.LOG.warn("Exception while reading from "
-          + currentBlock + " of " + src + " from "
-          + currentNode, e);
-      throw e;
-    }
-  }
-
   /**
-   * Seek to a new arbitrary location
+   * Seek to a new arbitrary location.
    */
   @Override
   public synchronized void seek(long targetPos) throws IOException {
@@ -469,7 +399,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   /**
-   * Copy the data from {@link #curStripeBuf} into the given buffer
+   * Copy the data from {@link #curStripeBuf} into the given buffer.
    * @param strategy the ReaderStrategy containing the given buffer
    * @param length target length
    * @return number of bytes copied
@@ -530,17 +460,19 @@ public class DFSStripedInputStream extends DFSInputStream 
{
 
     AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
         ecPolicy, cellSize, blockGroup, start, end, buf);
-    CompletionService<Void> readService = new ExecutorCompletionService<>(
-        dfsClient.getStripedReadsThreadPool());
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
     try {
       for (AlignedStripe stripe : stripes) {
         // Parse group to get chosen DN location
-        StripeReader preader = new PositionStripeReader(readService, stripe,
-            blks, preaderInfos, corruptedBlocks);
-        preader.readStripe();
+        StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
+            preaderInfos, corruptedBlocks, decoder, this);
+        try {
+          preader.readStripe();
+        } finally {
+          preader.close();
+        }
       }
       buf.position(buf.position() + (int)(end - start + 1));
     } finally {
@@ -571,376 +503,6 @@ public class DFSStripedInputStream extends DFSInputStream 
{
   }
 
   /**
-   * The reader for reading a complete {@link AlignedStripe}. Note that an
-   * {@link AlignedStripe} may cross multiple stripes with cellSize width.
-   */
-  private abstract class StripeReader {
-    final Map<Future<Void>, Integer> futures = new HashMap<>();
-    final AlignedStripe alignedStripe;
-    final CompletionService<Void> service;
-    final LocatedBlock[] targetBlocks;
-    final CorruptedBlocks corruptedBlocks;
-    final BlockReaderInfo[] readerInfos;
-
-    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
-        LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
-                 CorruptedBlocks corruptedBlocks) {
-      this.service = service;
-      this.alignedStripe = alignedStripe;
-      this.targetBlocks = targetBlocks;
-      this.readerInfos = readerInfos;
-      this.corruptedBlocks = corruptedBlocks;
-    }
-
-    /** prepare all the data chunks */
-    abstract void prepareDecodeInputs();
-
-    /** prepare the parity chunk and block reader if necessary */
-    abstract boolean prepareParityChunk(int index);
-
-    abstract void decode();
-
-    void updateState4SuccessRead(StripingChunkReadResult result) {
-      Preconditions.checkArgument(
-          result.state == StripingChunkReadResult.SUCCESSFUL);
-      readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
-          + alignedStripe.getSpanInBlock());
-    }
-
-    private void checkMissingBlocks() throws IOException {
-      if (alignedStripe.missingChunksNum > parityBlkNum) {
-        clearFutures(futures.keySet());
-        throw new IOException(alignedStripe.missingChunksNum
-            + " missing blocks, the stripe is: " + alignedStripe
-            + "; locatedBlocks is: " + locatedBlocks);
-      }
-    }
-
-    /**
-     * We need decoding. Thus go through all the data chunks and make sure we
-     * submit read requests for all of them.
-     */
-    private void readDataForDecoding() throws IOException {
-      prepareDecodeInputs();
-      for (int i = 0; i < dataBlkNum; i++) {
-        Preconditions.checkNotNull(alignedStripe.chunks[i]);
-        if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
-          if (!readChunk(targetBlocks[i], i)) {
-            alignedStripe.missingChunksNum++;
-          }
-        }
-      }
-      checkMissingBlocks();
-    }
-
-    void readParityChunks(int num) throws IOException {
-      for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
-           i++) {
-        if (alignedStripe.chunks[i] == null) {
-          if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
-            j++;
-          } else {
-            alignedStripe.missingChunksNum++;
-          }
-        }
-      }
-      checkMissingBlocks();
-    }
-
-    boolean createBlockReader(LocatedBlock block, int chunkIndex)
-        throws IOException {
-      BlockReader reader = null;
-      final ReaderRetryPolicy retry = new ReaderRetryPolicy();
-      DNAddrPair dnInfo = new DNAddrPair(null, null, null);
-
-      while(true) {
-        try {
-          // the cached block location might have been re-fetched, so always
-          // get it from cache.
-          block = refreshLocatedBlock(block);
-          targetBlocks[chunkIndex] = block;
-
-          // internal block has one location, just rule out the deadNodes
-          dnInfo = getBestNodeDNAddrPair(block, null);
-          if (dnInfo == null) {
-            break;
-          }
-          reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
-              block.getBlockSize() - alignedStripe.getOffsetInBlock(),
-              dnInfo.addr, dnInfo.storageType, dnInfo.info);
-        } catch (IOException e) {
-          if (e instanceof InvalidEncryptionKeyException &&
-              retry.shouldRefetchEncryptionKey()) {
-            DFSClient.LOG.info("Will fetch a new encryption key and retry, "
-                + "encryption key was invalid when connecting to " + 
dnInfo.addr
-                + " : " + e);
-            dfsClient.clearDataEncryptionKey();
-            retry.refetchEncryptionKey();
-          } else if (retry.shouldRefetchToken() &&
-              tokenRefetchNeeded(e, dnInfo.addr)) {
-            fetchBlockAt(block.getStartOffset());
-            retry.refetchToken();
-          } else {
-            //TODO: handles connection issues
-            DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " 
+
-                "block" + block.getBlock(), e);
-            // re-fetch the block in case the block has been moved
-            fetchBlockAt(block.getStartOffset());
-            addToDeadNodes(dnInfo.info);
-          }
-        }
-        if (reader != null) {
-          readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
-              alignedStripe.getOffsetInBlock());
-          return true;
-        }
-      }
-      return false;
-    }
-
-    private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
-      if (chunk.useByteBuffer()) {
-        ByteBufferStrategy strategy = new ByteBufferStrategy(
-            chunk.getByteBuffer(), readStatistics, dfsClient);
-        return new ByteBufferStrategy[]{strategy};
-      } else {
-        ByteBufferStrategy[] strategies =
-            new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
-        for (int i = 0; i < strategies.length; i++) {
-          ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
-          strategies[i] =
-              new ByteBufferStrategy(buffer, readStatistics, dfsClient);
-        }
-        return strategies;
-      }
-    }
-
-    boolean readChunk(final LocatedBlock block, int chunkIndex)
-        throws IOException {
-      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
-      if (block == null) {
-        chunk.state = StripingChunk.MISSING;
-        return false;
-      }
-      if (readerInfos[chunkIndex] == null) {
-        if (!createBlockReader(block, chunkIndex)) {
-          chunk.state = StripingChunk.MISSING;
-          return false;
-        }
-      } else if (readerInfos[chunkIndex].shouldSkip) {
-        chunk.state = StripingChunk.MISSING;
-        return false;
-      }
-
-      chunk.state = StripingChunk.PENDING;
-      Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
-          readerInfos[chunkIndex].datanode,
-          readerInfos[chunkIndex].blockReaderOffset,
-          alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
-          block.getBlock(), corruptedBlocks);
-
-      Future<Void> request = service.submit(readCallable);
-      futures.put(request, chunkIndex);
-      return true;
-    }
-
-    /** read the whole stripe. do decoding if necessary */
-    void readStripe() throws IOException {
-      for (int i = 0; i < dataBlkNum; i++) {
-        if (alignedStripe.chunks[i] != null &&
-            alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
-          if (!readChunk(targetBlocks[i], i)) {
-            alignedStripe.missingChunksNum++;
-          }
-        }
-      }
-      // There are missing block locations at this stage. Thus we need to read
-      // the full stripe and one more parity block.
-      if (alignedStripe.missingChunksNum > 0) {
-        checkMissingBlocks();
-        readDataForDecoding();
-        // read parity chunks
-        readParityChunks(alignedStripe.missingChunksNum);
-      }
-      // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
-
-      // Input buffers for potential decode operation, which remains null until
-      // first read failure
-      while (!futures.isEmpty()) {
-        try {
-          StripingChunkReadResult r = StripedBlockUtil
-              .getNextCompletedStripedRead(service, futures, 0);
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
-                + alignedStripe);
-          }
-          StripingChunk returnedChunk = alignedStripe.chunks[r.index];
-          Preconditions.checkNotNull(returnedChunk);
-          Preconditions.checkState(returnedChunk.state == 
StripingChunk.PENDING);
-
-          if (r.state == StripingChunkReadResult.SUCCESSFUL) {
-            returnedChunk.state = StripingChunk.FETCHED;
-            alignedStripe.fetchedChunksNum++;
-            updateState4SuccessRead(r);
-            if (alignedStripe.fetchedChunksNum == dataBlkNum) {
-              clearFutures(futures.keySet());
-              break;
-            }
-          } else {
-            returnedChunk.state = StripingChunk.MISSING;
-            // close the corresponding reader
-            closeReader(readerInfos[r.index]);
-
-            final int missing = alignedStripe.missingChunksNum;
-            alignedStripe.missingChunksNum++;
-            checkMissingBlocks();
-
-            readDataForDecoding();
-            readParityChunks(alignedStripe.missingChunksNum - missing);
-          }
-        } catch (InterruptedException ie) {
-          String err = "Read request interrupted";
-          DFSClient.LOG.error(err);
-          clearFutures(futures.keySet());
-          // Don't decode if read interrupted
-          throw new InterruptedIOException(err);
-        }
-      }
-
-      if (alignedStripe.missingChunksNum > 0) {
-        decode();
-      }
-    }
-  }
-
-  class PositionStripeReader extends StripeReader {
-    private ByteBuffer[] decodeInputs = null;
-
-    PositionStripeReader(CompletionService<Void> service,
-        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
-        BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
-      super(service, alignedStripe, targetBlocks, readerInfos,
-          corruptedBlocks);
-    }
-
-    @Override
-    void prepareDecodeInputs() {
-      if (decodeInputs == null) {
-        decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
-            dataBlkNum, parityBlkNum);
-      }
-    }
-
-    @Override
-    boolean prepareParityChunk(int index) {
-      Preconditions.checkState(index >= dataBlkNum &&
-          alignedStripe.chunks[index] == null);
-      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
-      return true;
-    }
-
-    @Override
-    void decode() {
-      StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe);
-      StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
-          dataBlkNum, parityBlkNum, decoder);
-    }
-  }
-
-  class StatefulStripeReader extends StripeReader {
-    ByteBuffer[] decodeInputs;
-
-    StatefulStripeReader(CompletionService<Void> service,
-        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
-        BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
-      super(service, alignedStripe, targetBlocks, readerInfos,
-          corruptedBlocks);
-    }
-
-    @Override
-    void prepareDecodeInputs() {
-      if (decodeInputs == null) {
-        decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
-        final ByteBuffer cur;
-        synchronized (DFSStripedInputStream.this) {
-          cur = curStripeBuf.duplicate();
-        }
-        StripedBlockUtil.VerticalRange range = alignedStripe.range;
-        for (int i = 0; i < dataBlkNum; i++) {
-          cur.limit(cur.capacity());
-          int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
-          cur.position(pos);
-          cur.limit((int) (pos + range.spanInBlock));
-          decodeInputs[i] = cur.slice();
-          if (alignedStripe.chunks[i] == null) {
-            alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
-          }
-        }
-      }
-    }
-
-    @Override
-    boolean prepareParityChunk(int index) {
-      Preconditions.checkState(index >= dataBlkNum
-          && alignedStripe.chunks[index] == null);
-      if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
-        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
-        // we have failed the block reader before
-        return false;
-      }
-      final int parityIndex = index - dataBlkNum;
-      ByteBuffer buf = getParityBuffer().duplicate();
-      buf.position(cellSize * parityIndex);
-      buf.limit(cellSize * parityIndex + (int) 
alignedStripe.range.spanInBlock);
-      decodeInputs[index] = buf.slice();
-      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
-      return true;
-    }
-
-    @Override
-    void decode() {
-      final int span = (int) alignedStripe.getSpanInBlock();
-      for (int i = 0; i < alignedStripe.chunks.length; i++) {
-        if (alignedStripe.chunks[i] != null &&
-            alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
-          for (int j = 0; j < span; j++) {
-            decodeInputs[i].put((byte) 0);
-          }
-          decodeInputs[i].flip();
-        } else if (alignedStripe.chunks[i] != null &&
-            alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
-          decodeInputs[i].position(0);
-          decodeInputs[i].limit(span);
-        }
-      }
-      int[] decodeIndices = new int[parityBlkNum];
-      int pos = 0;
-      for (int i = 0; i < alignedStripe.chunks.length; i++) {
-        if (alignedStripe.chunks[i] != null &&
-            alignedStripe.chunks[i].state == StripingChunk.MISSING) {
-          if (i < dataBlkNum) {
-            decodeIndices[pos++] = i;
-          } else {
-            decodeInputs[i] = null;
-          }
-        }
-      }
-      decodeIndices = Arrays.copyOf(decodeIndices, pos);
-
-      final int decodeChunkNum = decodeIndices.length;
-      ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
-      for (int i = 0; i < decodeChunkNum; i++) {
-        outputs[i] = decodeInputs[decodeIndices[i]];
-        outputs[i].position(0);
-        outputs[i].limit((int) alignedStripe.range.spanInBlock);
-        decodeInputs[decodeIndices[i]] = null;
-      }
-
-      decoder.decode(decodeInputs, decodeIndices, outputs);
-    }
-  }
-
-  /**
    * May need online read recovery, zero-copy read doesn't make
    * sense, so don't support it.
    */
@@ -957,12 +519,4 @@ public class DFSStripedInputStream extends DFSInputStream {
     throw new UnsupportedOperationException(
         "Not support enhanced byte buffer access.");
   }
-
-  /** A variation to {@link DFSInputStream#cancelAll} */
-  private void clearFutures(Collection<Future<Void>> futures) {
-    for (Future<Void> future : futures) {
-      future.cancel(false);
-    }
-    futures.clear();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
new file mode 100644
index 0000000..5818291
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}
+ * which may cross multiple stripes with cellSize width.
+ */
+class PositionStripeReader extends StripeReader {
+  private ByteBuffer codingBuffer;
+
+  PositionStripeReader(AlignedStripe alignedStripe,
+      ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
+      BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
+      RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) {
+    super(alignedStripe, ecPolicy, targetBlocks, readerInfos,
+        corruptedBlocks, decoder, dfsStripedInputStream);
+  }
+
+  @Override
+  void prepareDecodeInputs() {
+    if (codingBuffer == null) {
+      this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
+      initDecodeInputs(alignedStripe);
+    }
+  }
+
+  @Override
+  boolean prepareParityChunk(int index) {
+    Preconditions.checkState(index >= dataBlkNum &&
+        alignedStripe.chunks[index] == null);
+
+    alignedStripe.chunks[index] =
+        new StripingChunk(decodeInputs[index].getBuffer());
+
+    return true;
+  }
+
+  @Override
+  void decode() {
+    finalizeDecodeInputs();
+    decodeAndFillBuffer(true);
+  }
+
+  void initDecodeInputs(AlignedStripe alignedStripe) {
+    int bufLen = (int) alignedStripe.getSpanInBlock();
+    int bufCount = dataBlkNum + parityBlkNum;
+    codingBuffer = dfsStripedInputStream.getBufferPool().
+        getBuffer(useDirectBuffer(), bufLen * bufCount);
+    ByteBuffer buffer;
+    for (int i = 0; i < decodeInputs.length; i++) {
+      buffer = codingBuffer.duplicate();
+      decodeInputs[i] = new ECChunk(buffer, i * bufLen, bufLen);
+    }
+
+    for (int i = 0; i < dataBlkNum; i++) {
+      if (alignedStripe.chunks[i] == null) {
+        alignedStripe.chunks[i] =
+            new StripingChunk(decodeInputs[i].getBuffer());
+      }
+    }
+  }
+
+  void close() {
+    if (decodeInputs != null) {
+      for (int i = 0; i < decodeInputs.length; i++) {
+        decodeInputs[i] = null;
+      }
+    }
+
+    if (codingBuffer != null) {
+      dfsStripedInputStream.getBufferPool().putBuffer(codingBuffer);
+      codingBuffer = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
new file mode 100644
index 0000000..8879514
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}
+ * which belongs to a single stripe.
+ * Reading cross multiple strips is not supported in this reader.
+ */
+class StatefulStripeReader extends StripeReader {
+
+  StatefulStripeReader(AlignedStripe alignedStripe,
+      ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
+      BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
+      RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) {
+    super(alignedStripe, ecPolicy, targetBlocks, readerInfos,
+        corruptedBlocks, decoder, dfsStripedInputStream);
+  }
+
+  @Override
+  void prepareDecodeInputs() {
+    final ByteBuffer cur;
+    synchronized (dfsStripedInputStream) {
+      cur = dfsStripedInputStream.getCurStripeBuf().duplicate();
+    }
+
+    this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
+    int bufLen = (int) alignedStripe.getSpanInBlock();
+    int bufOff = (int) alignedStripe.getOffsetInBlock();
+    for (int i = 0; i < dataBlkNum; i++) {
+      cur.limit(cur.capacity());
+      int pos = bufOff % cellSize + cellSize * i;
+      cur.position(pos);
+      cur.limit(pos + bufLen);
+      decodeInputs[i] = new ECChunk(cur.slice(), 0, bufLen);
+      if (alignedStripe.chunks[i] == null) {
+        alignedStripe.chunks[i] =
+            new StripingChunk(decodeInputs[i].getBuffer());
+      }
+    }
+  }
+
+  @Override
+  boolean prepareParityChunk(int index) {
+    Preconditions.checkState(index >= dataBlkNum
+        && alignedStripe.chunks[index] == null);
+    if (readerInfos[index] != null && readerInfos[index].shouldSkip) {
+      alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
+      // we have failed the block reader before
+      return false;
+    }
+    final int parityIndex = index - dataBlkNum;
+    ByteBuffer buf = dfsStripedInputStream.getParityBuffer().duplicate();
+    buf.position(cellSize * parityIndex);
+    buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
+    decodeInputs[index] =
+        new ECChunk(buf.slice(), 0, (int) alignedStripe.range.spanInBlock);
+    alignedStripe.chunks[index] =
+        new StripingChunk(decodeInputs[index].getBuffer());
+    return true;
+  }
+
+  @Override
+  void decode() {
+    finalizeDecodeInputs();
+    decodeAndFillBuffer(false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
new file mode 100644
index 0000000..5518752
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+
+/**
+ * The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}.
+ * Note that an {@link StripedBlockUtil.AlignedStripe} may cross multiple
+ * stripes with cellSize width.
+ */
+abstract class StripeReader {
+
+  static class ReaderRetryPolicy {
+    private int fetchEncryptionKeyTimes = 1;
+    private int fetchTokenTimes = 1;
+
+    void refetchEncryptionKey() {
+      fetchEncryptionKeyTimes--;
+    }
+
+    void refetchToken() {
+      fetchTokenTimes--;
+    }
+
+    boolean shouldRefetchEncryptionKey() {
+      return fetchEncryptionKeyTimes > 0;
+    }
+
+    boolean shouldRefetchToken() {
+      return fetchTokenTimes > 0;
+    }
+  }
+
+  static class BlockReaderInfo {
+    final BlockReader reader;
+    final DatanodeInfo datanode;
+    /**
+     * when initializing block readers, their starting offsets are set to the
+     * same number: the smallest internal block offsets among all the readers.
+     * This is because it is possible that for some internal blocks we have to
+     * read "backwards" for decoding purpose. We thus use this offset array to
+     * track offsets for all the block readers so that we can skip data if
+     * necessary.
+     */
+    long blockReaderOffset;
+    /**
+     * We use this field to indicate whether we should use this reader. In case
+     * we hit any issue with this reader, we set this field to true and avoid
+     * using it for the next stripe.
+     */
+    boolean shouldSkip = false;
+
+    BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
+      this.reader = reader;
+      this.datanode = dn;
+      this.blockReaderOffset = offset;
+    }
+
+    void setOffset(long offset) {
+      this.blockReaderOffset = offset;
+    }
+
+    void skip() {
+      this.shouldSkip = true;
+    }
+  }
+
+  protected final Map<Future<Void>, Integer> futures = new HashMap<>();
+  protected final AlignedStripe alignedStripe;
+  protected final CompletionService<Void> service;
+  protected final LocatedBlock[] targetBlocks;
+  protected final CorruptedBlocks corruptedBlocks;
+  protected final BlockReaderInfo[] readerInfos;
+  protected final ErasureCodingPolicy ecPolicy;
+  protected final short dataBlkNum;
+  protected final short parityBlkNum;
+  protected final int cellSize;
+  protected final RawErasureDecoder decoder;
+  protected final DFSStripedInputStream dfsStripedInputStream;
+
+  protected ECChunk[] decodeInputs;
+
+  StripeReader(AlignedStripe alignedStripe,
+      ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
+      BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
+      RawErasureDecoder decoder,
+      DFSStripedInputStream dfsStripedInputStream) {
+    this.alignedStripe = alignedStripe;
+    this.ecPolicy = ecPolicy;
+    this.dataBlkNum = (short)ecPolicy.getNumDataUnits();
+    this.parityBlkNum = (short)ecPolicy.getNumParityUnits();
+    this.cellSize = ecPolicy.getCellSize();
+    this.targetBlocks = targetBlocks;
+    this.readerInfos = readerInfos;
+    this.corruptedBlocks = corruptedBlocks;
+    this.decoder = decoder;
+    this.dfsStripedInputStream = dfsStripedInputStream;
+
+    service = new ExecutorCompletionService<>(
+            dfsStripedInputStream.getStripedReadsThreadPool());
+  }
+
+  /**
+   * Prepare all the data chunks.
+   */
+  abstract void prepareDecodeInputs();
+
+  /**
+   * Prepare the parity chunk and block reader if necessary.
+   */
+  abstract boolean prepareParityChunk(int index);
+
+  /*
+   * Decode to get the missing data.
+   */
+  abstract void decode();
+
+  /*
+   * Default close do nothing.
+   */
+  void close() {
+  }
+
+  void updateState4SuccessRead(StripingChunkReadResult result) {
+    Preconditions.checkArgument(
+        result.state == StripingChunkReadResult.SUCCESSFUL);
+    readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
+        + alignedStripe.getSpanInBlock());
+  }
+
+  private void checkMissingBlocks() throws IOException {
+    if (alignedStripe.missingChunksNum > parityBlkNum) {
+      clearFutures();
+      throw new IOException(alignedStripe.missingChunksNum
+          + " missing blocks, the stripe is: " + alignedStripe
+          + "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
+    }
+  }
+
+  /**
+   * We need decoding. Thus go through all the data chunks and make sure we
+   * submit read requests for all of them.
+   */
+  private void readDataForDecoding() throws IOException {
+    prepareDecodeInputs();
+    for (int i = 0; i < dataBlkNum; i++) {
+      Preconditions.checkNotNull(alignedStripe.chunks[i]);
+      if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
+        if (!readChunk(targetBlocks[i], i)) {
+          alignedStripe.missingChunksNum++;
+        }
+      }
+    }
+    checkMissingBlocks();
+  }
+
+  void readParityChunks(int num) throws IOException {
+    for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
+         i++) {
+      if (alignedStripe.chunks[i] == null) {
+        if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
+          j++;
+        } else {
+          alignedStripe.missingChunksNum++;
+        }
+      }
+    }
+    checkMissingBlocks();
+  }
+
+  private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
+    if (chunk.useByteBuffer()) {
+      ByteBufferStrategy strategy = new ByteBufferStrategy(
+          chunk.getByteBuffer(), dfsStripedInputStream.getReadStatistics(),
+          dfsStripedInputStream.getDFSClient());
+      return new ByteBufferStrategy[]{strategy};
+    }
+
+    ByteBufferStrategy[] strategies =
+        new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
+    for (int i = 0; i < strategies.length; i++) {
+      ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
+      strategies[i] = new ByteBufferStrategy(buffer,
+              dfsStripedInputStream.getReadStatistics(),
+              dfsStripedInputStream.getDFSClient());
+    }
+    return strategies;
+  }
+
+  private int readToBuffer(BlockReader blockReader,
+      DatanodeInfo currentNode, ByteBufferStrategy strategy,
+      ExtendedBlock currentBlock) throws IOException {
+    final int targetLength = strategy.getTargetLength();
+    int length = 0;
+    try {
+      while (length < targetLength) {
+        int ret = strategy.readFromBlock(blockReader);
+        if (ret < 0) {
+          throw new IOException("Unexpected EOS from the reader");
+        }
+        length += ret;
+      }
+      return length;
+    } catch (ChecksumException ce) {
+      DFSClient.LOG.warn("Found Checksum error for "
+          + currentBlock + " from " + currentNode
+          + " at " + ce.getPos());
+      // we want to remember which block replicas we have tried
+      corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
+      throw ce;
+    } catch (IOException e) {
+      DFSClient.LOG.warn("Exception while reading from "
+          + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+          + currentNode, e);
+      throw e;
+    }
+  }
+
+  private Callable<Void> readCells(final BlockReader reader,
+      final DatanodeInfo datanode, final long currentReaderOffset,
+      final long targetReaderOffset, final ByteBufferStrategy[] strategies,
+      final ExtendedBlock currentBlock) {
+    return () -> {
+      // reader can be null if getBlockReaderWithRetry failed or
+      // the reader hit exception before
+      if (reader == null) {
+        throw new IOException("The BlockReader is null. " +
+            "The BlockReader creation failed or the reader hit exception.");
+      }
+      Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
+      if (currentReaderOffset < targetReaderOffset) {
+        long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
+        Preconditions.checkState(
+            skipped == targetReaderOffset - currentReaderOffset);
+      }
+
+      for (ByteBufferStrategy strategy : strategies) {
+        readToBuffer(reader, datanode, strategy, currentBlock);
+      }
+      return null;
+    };
+  }
+
+  boolean readChunk(final LocatedBlock block, int chunkIndex)
+      throws IOException {
+    final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+    if (block == null) {
+      chunk.state = StripingChunk.MISSING;
+      return false;
+    }
+
+    if (readerInfos[chunkIndex] == null) {
+      if (!dfsStripedInputStream.createBlockReader(block,
+          alignedStripe.getOffsetInBlock(), targetBlocks,
+          readerInfos, chunkIndex)) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
+    } else if (readerInfos[chunkIndex].shouldSkip) {
+      chunk.state = StripingChunk.MISSING;
+      return false;
+    }
+
+    chunk.state = StripingChunk.PENDING;
+    Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
+        readerInfos[chunkIndex].datanode,
+        readerInfos[chunkIndex].blockReaderOffset,
+        alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
+        block.getBlock());
+
+    Future<Void> request = service.submit(readCallable);
+    futures.put(request, chunkIndex);
+    return true;
+  }
+
+  /**
+   * read the whole stripe. do decoding if necessary
+   */
+  void readStripe() throws IOException {
+    for (int i = 0; i < dataBlkNum; i++) {
+      if (alignedStripe.chunks[i] != null &&
+          alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+        if (!readChunk(targetBlocks[i], i)) {
+          alignedStripe.missingChunksNum++;
+        }
+      }
+    }
+    // There are missing block locations at this stage. Thus we need to read
+    // the full stripe and one more parity block.
+    if (alignedStripe.missingChunksNum > 0) {
+      checkMissingBlocks();
+      readDataForDecoding();
+      // read parity chunks
+      readParityChunks(alignedStripe.missingChunksNum);
+    }
+    // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
+
+    // Input buffers for potential decode operation, which remains null until
+    // first read failure
+    while (!futures.isEmpty()) {
+      try {
+        StripingChunkReadResult r = StripedBlockUtil
+            .getNextCompletedStripedRead(service, futures, 0);
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+              + alignedStripe);
+        }
+        StripingChunk returnedChunk = alignedStripe.chunks[r.index];
+        Preconditions.checkNotNull(returnedChunk);
+        Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
+
+        if (r.state == StripingChunkReadResult.SUCCESSFUL) {
+          returnedChunk.state = StripingChunk.FETCHED;
+          alignedStripe.fetchedChunksNum++;
+          updateState4SuccessRead(r);
+          if (alignedStripe.fetchedChunksNum == dataBlkNum) {
+            clearFutures();
+            break;
+          }
+        } else {
+          returnedChunk.state = StripingChunk.MISSING;
+          // close the corresponding reader
+          dfsStripedInputStream.closeReader(readerInfos[r.index]);
+
+          final int missing = alignedStripe.missingChunksNum;
+          alignedStripe.missingChunksNum++;
+          checkMissingBlocks();
+
+          readDataForDecoding();
+          readParityChunks(alignedStripe.missingChunksNum - missing);
+        }
+      } catch (InterruptedException ie) {
+        String err = "Read request interrupted";
+        DFSClient.LOG.error(err);
+        clearFutures();
+        // Don't decode if read interrupted
+        throw new InterruptedIOException(err);
+      }
+    }
+
+    if (alignedStripe.missingChunksNum > 0) {
+      decode();
+    }
+  }
+
+  /**
+   * Some fetched {@link StripingChunk} might be stored in original application
+   * buffer instead of prepared decode input buffers. Some others are beyond
+   * the range of the internal blocks and should correspond to all zero bytes.
+   * When all pending requests have returned, this method should be called to
+   * finalize decode input buffers.
+   */
+
+  void finalizeDecodeInputs() {
+    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+      final StripingChunk chunk = alignedStripe.chunks[i];
+      if (chunk != null && chunk.state == StripingChunk.FETCHED) {
+        if (chunk.useChunkBuffer()) {
+          chunk.getChunkBuffer().copyTo(decodeInputs[i].getBuffer());
+        } else {
+          chunk.getByteBuffer().flip();
+        }
+      } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
+        decodeInputs[i].setAllZero(true);
+      }
+    }
+  }
+
+  /**
+   * Decode based on the given input buffers and erasure coding policy.
+   */
+  void decodeAndFillBuffer(boolean fillBuffer) {
+    // Step 1: prepare indices and output buffers for missing data units
+    int[] decodeIndices = prepareErasedIndices();
+
+    final int decodeChunkNum = decodeIndices.length;
+    ECChunk[] outputs = new ECChunk[decodeChunkNum];
+    for (int i = 0; i < decodeChunkNum; i++) {
+      outputs[i] = decodeInputs[decodeIndices[i]];
+      decodeInputs[decodeIndices[i]] = null;
+    }
+    // Step 2: decode into prepared output buffers
+    decoder.decode(decodeInputs, decodeIndices, outputs);
+
+    // Step 3: fill original application buffer with decoded data
+    if (fillBuffer) {
+      for (int i = 0; i < decodeIndices.length; i++) {
+        int missingBlkIdx = decodeIndices[i];
+        StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
+        if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
+          chunk.getChunkBuffer().copyFrom(outputs[i].getBuffer());
+        }
+      }
+    }
+  }
+
+  /**
+   * Prepare erased indices.
+   */
+  int[] prepareErasedIndices() {
+    int[] decodeIndices = new int[parityBlkNum];
+    int pos = 0;
+    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+      if (alignedStripe.chunks[i] != null &&
+          alignedStripe.chunks[i].state == StripingChunk.MISSING){
+        decodeIndices[pos++] = i;
+      }
+    }
+
+    int[] erasedIndices = Arrays.copyOf(decodeIndices, pos);
+    return erasedIndices;
+  }
+
+  void clearFutures() {
+    for (Future<Void> future : futures.keySet()) {
+      future.cancel(false);
+    }
+    futures.clear();
+  }
+
+  boolean useDirectBuffer() {
+    return decoder.preferDirectBuffer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 4dbbc3d..896ebc6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSStripedOutputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,18 +76,6 @@ public class StripedBlockUtil {
       LoggerFactory.getLogger(StripedBlockUtil.class);
 
   /**
-   * Parses a striped block group into individual blocks.
-   * @param bg The striped block group
-   * @param ecPolicy The erasure coding policy
-   * @return An array of the blocks in the group
-   */
-  public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
-                                               ErasureCodingPolicy ecPolicy) {
-    return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
-        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
-  }
-
-  /**
    * This method parses a striped block group into individual blocks.
    *
    * @param bg The striped block group
@@ -112,7 +99,7 @@ public class StripedBlockUtil {
   }
 
   /**
-   * This method creates an internal block at the given index of a block group
+   * This method creates an internal block at the given index of a block group.
    *
    * @param idxInReturnedLocs The index in the stored locations in the
    *                          {@link LocatedStripedBlock} object
@@ -169,7 +156,7 @@ public class StripedBlockUtil {
   }
 
   /**
-   * Get the size of an internal block at the given index of a block group
+   * Get the size of an internal block at the given index of a block group.
    *
    * @param dataSize Size of the block group only counting data blocks
    * @param cellSize The size of a striping cell
@@ -237,7 +224,7 @@ public class StripedBlockUtil {
 
   /**
    * Given a byte's offset in an internal block, calculate the offset in
-   * the block group
+   * the block group.
    */
   public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
       long offsetInBlk, int idxInBlockGroup) {
@@ -248,12 +235,12 @@ public class StripedBlockUtil {
   }
 
   /**
-   * Get the next completed striped read task
+   * Get the next completed striped read task.
    *
-   * @return {@link StripingChunkReadResult} indicating the status of the read 
task
-   *          succeeded, and the block index of the task. If the method times
-   *          out without getting any completed read tasks, -1 is returned as
-   *          block index.
+   * @return {@link StripingChunkReadResult} indicating the status of the read
+   *          task succeeded, and the block index of the task. If the method
+   *          times out without getting any completed read tasks, -1 is
+   *          returned as block index.
    * @throws InterruptedException
    */
   public static StripingChunkReadResult getNextCompletedStripedRead(
@@ -287,7 +274,7 @@ public class StripedBlockUtil {
 
   /**
    * Get the total usage of the striped blocks, which is the total of data
-   * blocks and parity blocks
+   * blocks and parity blocks.
    *
    * @param numDataBlkBytes
    *          Size of the block group only counting data blocks
@@ -308,91 +295,6 @@ public class StripedBlockUtil {
   }
 
   /**
-   * Initialize the decoding input buffers based on the chunk states in an
-   * {@link AlignedStripe}. For each chunk that was not initially requested,
-   * schedule a new fetch request with the decoding input buffer as transfer
-   * destination.
-   */
-  public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe,
-      int dataBlkNum, int parityBlkNum) {
-    ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
-    for (int i = 0; i < decodeInputs.length; i++) {
-      decodeInputs[i] = ByteBuffer.allocate(
-          (int) alignedStripe.getSpanInBlock());
-    }
-    // read the full data aligned stripe
-    for (int i = 0; i < dataBlkNum; i++) {
-      if (alignedStripe.chunks[i] == null) {
-        alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
-      }
-    }
-    return decodeInputs;
-  }
-
-  /**
-   * Some fetched {@link StripingChunk} might be stored in original application
-   * buffer instead of prepared decode input buffers. Some others are beyond
-   * the range of the internal blocks and should correspond to all zero bytes.
-   * When all pending requests have returned, this method should be called to
-   * finalize decode input buffers.
-   */
-  public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs,
-                                          AlignedStripe alignedStripe) {
-    for (int i = 0; i < alignedStripe.chunks.length; i++) {
-      final StripingChunk chunk = alignedStripe.chunks[i];
-      if (chunk != null && chunk.state == StripingChunk.FETCHED) {
-        if (chunk.useChunkBuffer()) {
-          chunk.getChunkBuffer().copyTo(decodeInputs[i]);
-        } else {
-          chunk.getByteBuffer().flip();
-        }
-      } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
-        //ZERO it. Will be better handled in other following issue.
-        byte[] emptyBytes = new byte[decodeInputs[i].limit()];
-        decodeInputs[i].put(emptyBytes);
-        decodeInputs[i].flip();
-      } else {
-        decodeInputs[i] = null;
-      }
-    }
-  }
-
-  /**
-   * Decode based on the given input buffers and erasure coding policy.
-   */
-  public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs,
-      AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
-      RawErasureDecoder decoder) {
-    // Step 1: prepare indices and output buffers for missing data units
-    int[] decodeIndices = new int[parityBlkNum];
-    int pos = 0;
-    for (int i = 0; i < dataBlkNum; i++) {
-      if (alignedStripe.chunks[i] != null &&
-          alignedStripe.chunks[i].state == StripingChunk.MISSING){
-        decodeIndices[pos++] = i;
-      }
-    }
-    decodeIndices = Arrays.copyOf(decodeIndices, pos);
-    ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length];
-    for (int i = 0; i < decodeOutputs.length; i++) {
-      decodeOutputs[i] = ByteBuffer.allocate(
-          (int) alignedStripe.getSpanInBlock());
-    }
-
-    // Step 2: decode into prepared output buffers
-    decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
-
-    // Step 3: fill original application buffer with decoded data
-    for (int i = 0; i < decodeIndices.length; i++) {
-      int missingBlkIdx = decodeIndices[i];
-      StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
-      if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
-        chunk.getChunkBuffer().copyFrom(decodeOutputs[i]);
-      }
-    }
-  }
-
-  /**
    * Similar functionality with {@link #divideByteRangeIntoStripes}, but is 
used
    * by stateful read and uses ByteBuffer as reading target buffer. Besides the
    * read range is within a single stripe thus the calculation logic is 
simpler.
@@ -485,7 +387,7 @@ public class StripedBlockUtil {
   /**
    * Map the logical byte range to a set of inclusive {@link StripingCell}
    * instances, each representing the overlap of the byte range to a cell
-   * used by {@link DFSStripedOutputStream} in encoding
+   * used by {@link DFSStripedOutputStream} in encoding.
    */
   @VisibleForTesting
   private static StripingCell[] getStripingCellsOfByteRange(
@@ -530,7 +432,7 @@ public class StripedBlockUtil {
     int dataBlkNum = ecPolicy.getNumDataUnits();
     int parityBlkNum = ecPolicy.getNumParityUnits();
 
-    VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
+    VerticalRange[] ranges = new VerticalRange[dataBlkNum + parityBlkNum];
 
     long earliestStart = Long.MAX_VALUE;
     long latestEnd = -1;
@@ -675,7 +577,7 @@ public class StripedBlockUtil {
   @VisibleForTesting
   static class StripingCell {
     final ErasureCodingPolicy ecPolicy;
-    /** Logical order in a block group, used when doing I/O to a block group */
+    /** Logical order in a block group, used when doing I/O to a block group. 
*/
     final int idxInBlkGroup;
     final int idxInInternalBlk;
     final int idxInStripe;
@@ -738,7 +640,7 @@ public class StripedBlockUtil {
    */
   public static class AlignedStripe {
     public VerticalRange range;
-    /** status of each chunk in the stripe */
+    /** status of each chunk in the stripe. */
     public final StripingChunk[] chunks;
     public int fetchedChunksNum = 0;
     public int missingChunksNum = 0;
@@ -790,9 +692,9 @@ public class StripedBlockUtil {
    * +-----+
    */
   public static class VerticalRange {
-    /** start offset in the block group (inclusive) */
+    /** start offset in the block group (inclusive). */
     public long offsetInBlock;
-    /** length of the stripe range */
+    /** length of the stripe range. */
     public long spanInBlock;
 
     public VerticalRange(long offsetInBlock, long length) {
@@ -801,7 +703,7 @@ public class StripedBlockUtil {
       this.spanInBlock = length;
     }
 
-    /** whether a position is in the range */
+    /** whether a position is in the range. */
     public boolean include(long pos) {
       return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
     }
@@ -915,7 +817,7 @@ public class StripedBlockUtil {
     /**
      *  Note: target will be ready-to-read state after the call.
      */
-    void copyTo(ByteBuffer target) {
+    public void copyTo(ByteBuffer target) {
       for (ByteBuffer slice : slices) {
         slice.flip();
         target.put(slice);
@@ -923,7 +825,7 @@ public class StripedBlockUtil {
       target.flip();
     }
 
-    void copyFrom(ByteBuffer src) {
+    public void copyFrom(ByteBuffer src) {
       ByteBuffer tmp;
       int len;
       for (ByteBuffer slice : slices) {
@@ -970,6 +872,28 @@ public class StripedBlockUtil {
     }
   }
 
+  /** Used to indicate the buffered data's range in the block group. */
+  public static class StripeRange {
+    /** start offset in the block group (inclusive). */
+    final long offsetInBlock;
+    /** length of the stripe range. */
+    final long length;
+
+    public StripeRange(long offsetInBlock, long length) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.offsetInBlock = offsetInBlock;
+      this.length = length;
+    }
+
+    public boolean include(long pos) {
+      return pos >= offsetInBlock && pos < offsetInBlock + length;
+    }
+
+    public long getLength() {
+      return length;
+    }
+  }
+
   /**
    * Check if the information such as IDs and generation stamps in block-i
    * match the block group.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
index 7d9d7dc..999eb1f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -283,5 +283,4 @@ public class TestStripedBlockUtil {
       }
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to