[06/50] [abbrv] hadoop git commit: HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek. Contributed by Thomas Marquardt
HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek. Contributed by Thomas Marquardt Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91b7a84 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91b7a84 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91b7a84 Branch: refs/heads/YARN-5881 Commit: d91b7a8451489f97bdde928cea774764155cfe03 Parents: 024c3ec Author: Steve LoughranAuthored: Sun Aug 6 20:19:23 2017 +0100 Committer: Steve Loughran Committed: Sun Aug 6 20:19:23 2017 +0100 -- .../hadoop/fs/azure/BlockBlobInputStream.java | 91 +++- .../fs/azure/TestBlockBlobInputStream.java | 85 -- 2 files changed, 150 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java -- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java index 5542415..c37b2be 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java @@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable { private InputStream blobInputStream = null; private int minimumReadSizeInBytes = 0; private long streamPositionAfterLastRead = -1; + // position of next network read within stream private long streamPosition = 0; + // length of stream private long streamLength = 0; private boolean closed = false; + // internal buffer, re-used for performance optimization private byte[] streamBuffer; + // zero-based offset within streamBuffer of current read position private int streamBufferPosition; + // length of data written to streamBuffer, streamBuffer may be larger private int streamBufferLength; /** @@ -82,6 +87,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable { } /** + * Reset the internal stream buffer but do not release the memory. + * The buffer can be reused to avoid frequent memory allocations of + * a large buffer. + */ + private void resetStreamBuffer() { +streamBufferPosition = 0; +streamBufferLength = 0; + } + + /** * Gets the read position of the stream. * @return the zero-based byte offset of the read position. * @throws IOException IO failure @@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream implements Seekable { @Override public synchronized long getPos() throws IOException { checkState(); -return streamPosition; +return (streamBuffer != null) +? streamPosition - streamBufferLength + streamBufferPosition +: streamPosition; } /** @@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable { throw new EOFException( FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } -if (pos == getPos()) { + +// calculate offset between the target and current position in the stream +long offset = pos - getPos(); + +if (offset == 0) { // no=op, no state change return; } +if (offset > 0) { + // forward seek, data can be skipped as an optimization + if (skip(offset) != offset) { +throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + return; +} + +// reverse seek, offset is negative if (streamBuffer != null) { - long offset = streamPosition - pos; - if (offset > 0 && offset < streamBufferLength) { -streamBufferPosition = streamBufferLength - (int) offset; + if (streamBufferPosition + offset >= 0) { +// target position is inside the stream buffer, +// only need to move backwards within the stream buffer +streamBufferPosition += offset; } else { -streamBufferPosition = streamBufferLength; +// target position is outside the stream buffer, +// need to reset stream buffer and move position for next network read +resetStreamBuffer(); +streamPosition = pos; } +} else { + streamPosition = pos; } -streamPosition = pos; // close BlobInputStream after seek is invoked because BlobInputStream // does not support seek closeBlobInputStream(); @@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream implements Seekable { streamBuffer = new byte[(int)
[21/51] [abbrv] hadoop git commit: HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek. Contributed by Thomas Marquardt
HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek. Contributed by Thomas Marquardt Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91b7a84 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91b7a84 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91b7a84 Branch: refs/heads/HADOOP-13345 Commit: d91b7a8451489f97bdde928cea774764155cfe03 Parents: 024c3ec Author: Steve LoughranAuthored: Sun Aug 6 20:19:23 2017 +0100 Committer: Steve Loughran Committed: Sun Aug 6 20:19:23 2017 +0100 -- .../hadoop/fs/azure/BlockBlobInputStream.java | 91 +++- .../fs/azure/TestBlockBlobInputStream.java | 85 -- 2 files changed, 150 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java -- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java index 5542415..c37b2be 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java @@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable { private InputStream blobInputStream = null; private int minimumReadSizeInBytes = 0; private long streamPositionAfterLastRead = -1; + // position of next network read within stream private long streamPosition = 0; + // length of stream private long streamLength = 0; private boolean closed = false; + // internal buffer, re-used for performance optimization private byte[] streamBuffer; + // zero-based offset within streamBuffer of current read position private int streamBufferPosition; + // length of data written to streamBuffer, streamBuffer may be larger private int streamBufferLength; /** @@ -82,6 +87,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable { } /** + * Reset the internal stream buffer but do not release the memory. + * The buffer can be reused to avoid frequent memory allocations of + * a large buffer. + */ + private void resetStreamBuffer() { +streamBufferPosition = 0; +streamBufferLength = 0; + } + + /** * Gets the read position of the stream. * @return the zero-based byte offset of the read position. * @throws IOException IO failure @@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream implements Seekable { @Override public synchronized long getPos() throws IOException { checkState(); -return streamPosition; +return (streamBuffer != null) +? streamPosition - streamBufferLength + streamBufferPosition +: streamPosition; } /** @@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable { throw new EOFException( FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } -if (pos == getPos()) { + +// calculate offset between the target and current position in the stream +long offset = pos - getPos(); + +if (offset == 0) { // no=op, no state change return; } +if (offset > 0) { + // forward seek, data can be skipped as an optimization + if (skip(offset) != offset) { +throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + return; +} + +// reverse seek, offset is negative if (streamBuffer != null) { - long offset = streamPosition - pos; - if (offset > 0 && offset < streamBufferLength) { -streamBufferPosition = streamBufferLength - (int) offset; + if (streamBufferPosition + offset >= 0) { +// target position is inside the stream buffer, +// only need to move backwards within the stream buffer +streamBufferPosition += offset; } else { -streamBufferPosition = streamBufferLength; +// target position is outside the stream buffer, +// need to reset stream buffer and move position for next network read +resetStreamBuffer(); +streamPosition = pos; } +} else { + streamPosition = pos; } -streamPosition = pos; // close BlobInputStream after seek is invoked because BlobInputStream // does not support seek closeBlobInputStream(); @@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream implements Seekable { streamBuffer = new byte[(int)