HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34e8b9f9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34e8b9f9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34e8b9f9 Branch: refs/heads/HDDS-48 Commit: 34e8b9f9a86fb03156861482643fba11bdee1dd4 Parents: fed2bef Author: Sammi Chen <sammi.c...@intel.com> Authored: Wed May 23 19:10:09 2018 +0800 Committer: Sammi Chen <sammi.c...@intel.com> Committed: Wed May 23 19:10:09 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/io/ElasticByteBufferPool.java | 12 ++++++ .../hadoop/hdfs/DFSStripedInputStream.java | 12 +++--- .../hadoop/hdfs/TestDFSStripedInputStream.java | 45 ++++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index 023f37f..9dd7771 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -116,4 +116,16 @@ public final class ElasticByteBufferPool implements ByteBufferPool { // poor granularity. } } + + /** + * Get the size of the buffer pool, for the specified buffer type. + * + * @param direct Whether the size is returned for direct buffers + * @return The size + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public int size(boolean direct) { + return getBufferTree(direct).size(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index f3b16e0..5557a50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -116,12 +116,14 @@ public class DFSStripedInputStream extends DFSInputStream { return decoder.preferDirectBuffer(); } - void resetCurStripeBuffer() { - if (curStripeBuf == null) { + private void resetCurStripeBuffer(boolean shouldAllocateBuf) { + if (shouldAllocateBuf && curStripeBuf == null) { curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize * dataBlkNum); } - curStripeBuf.clear(); + if (curStripeBuf != null) { + curStripeBuf.clear(); + } curStripeRange = new StripeRange(0, 0); } @@ -206,7 +208,7 @@ public class DFSStripedInputStream extends DFSInputStream { */ @Override protected void closeCurrentBlockReaders() { - resetCurStripeBuffer(); + resetCurStripeBuffer(false); if (blockReaders == null || blockReaders.length == 0) { return; } @@ -296,7 +298,7 @@ public class DFSStripedInputStream extends DFSInputStream { */ private void readOneStripe(CorruptedBlocks corruptedBlocks) throws IOException { - resetCurStripeBuffer(); + resetCurStripeBuffer(true); // compute stripe range based on pos final long offsetInBlockGroup = getOffsetInBlockGroup(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index cdebee0..422746e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; @@ -529,4 +530,48 @@ public class TestDFSStripedInputStream { } } } + + @Test + public void testCloseDoesNotAllocateNewBuffer() throws Exception { + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + stripesPerBlock, false, ecPolicy); + try (DFSInputStream in = fs.getClient().open(filePath.toString())) { + assertTrue(in instanceof DFSStripedInputStream); + final DFSStripedInputStream stream = (DFSStripedInputStream) in; + final ElasticByteBufferPool ebbp = + (ElasticByteBufferPool) stream.getBufferPool(); + // first clear existing pool + LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: " + + ebbp.size(false)); + emptyBufferPoolForCurrentPolicy(ebbp, true); + emptyBufferPoolForCurrentPolicy(ebbp, false); + final int startSizeDirect = ebbp.size(true); + final int startSizeIndirect = ebbp.size(false); + // close should not allocate new buffers in the pool. + stream.close(); + assertEquals(startSizeDirect, ebbp.size(true)); + assertEquals(startSizeIndirect, ebbp.size(false)); + } + } + + /** + * Empties the pool for the specified buffer type, for the current ecPolicy. + * <p> + * Note that {@link #ecPolicy} may change for difference test cases in + * {@link TestDFSStripedInputStreamWithRandomECPolicy}. + */ + private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp, + boolean direct) { + int size; + while ((size = ebbp.size(direct)) != 0) { + ebbp.getBuffer(direct, + ecPolicy.getCellSize() * ecPolicy.getNumDataUnits()); + if (size == ebbp.size(direct)) { + // if getBuffer didn't decrease size, it means the pool for the buffer + // corresponding to current ecPolicy is empty + break; + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org