[06/50] [abbrv] hadoop git commit: HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek. Contributed by Thomas Marquardt

2017-08-11 Thread wangda
HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek.
Contributed by Thomas Marquardt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91b7a84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91b7a84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91b7a84

Branch: refs/heads/YARN-5881
Commit: d91b7a8451489f97bdde928cea774764155cfe03
Parents: 024c3ec
Author: Steve Loughran 
Authored: Sun Aug 6 20:19:23 2017 +0100
Committer: Steve Loughran 
Committed: Sun Aug 6 20:19:23 2017 +0100

--
 .../hadoop/fs/azure/BlockBlobInputStream.java   | 91 +++-
 .../fs/azure/TestBlockBlobInputStream.java  | 85 --
 2 files changed, 150 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
--
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
index 5542415..c37b2be 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
@@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   private InputStream blobInputStream = null;
   private int minimumReadSizeInBytes = 0;
   private long streamPositionAfterLastRead = -1;
+  // position of next network read within stream
   private long streamPosition = 0;
+  // length of stream
   private long streamLength = 0;
   private boolean closed = false;
+  // internal buffer, re-used for performance optimization
   private byte[] streamBuffer;
+  // zero-based offset within streamBuffer of current read position
   private int streamBufferPosition;
+  // length of data written to streamBuffer, streamBuffer may be larger
   private int streamBufferLength;
 
   /**
@@ -82,6 +87,16 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   }
 
   /**
+   * Reset the internal stream buffer but do not release the memory.
+   * The buffer can be reused to avoid frequent memory allocations of
+   * a large buffer.
+   */
+  private void resetStreamBuffer() {
+streamBufferPosition = 0;
+streamBufferLength = 0;
+  }
+
+  /**
* Gets the read position of the stream.
* @return the zero-based byte offset of the read position.
* @throws IOException IO failure
@@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   @Override
   public synchronized long getPos() throws IOException {
 checkState();
-return streamPosition;
+return (streamBuffer != null)
+? streamPosition - streamBufferLength + streamBufferPosition
+: streamPosition;
   }
 
   /**
@@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   throw new EOFException(
   FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
 }
-if (pos == getPos()) {
+
+// calculate offset between the target and current position in the stream
+long offset = pos - getPos();
+
+if (offset == 0) {
   // no=op, no state change
   return;
 }
 
+if (offset > 0) {
+  // forward seek, data can be skipped as an optimization
+  if (skip(offset) != offset) {
+throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+  }
+  return;
+}
+
+// reverse seek, offset is negative
 if (streamBuffer != null) {
-  long offset = streamPosition - pos;
-  if (offset > 0 && offset < streamBufferLength) {
-streamBufferPosition = streamBufferLength - (int) offset;
+  if (streamBufferPosition + offset >= 0) {
+// target position is inside the stream buffer,
+// only need to move backwards within the stream buffer
+streamBufferPosition += offset;
   } else {
-streamBufferPosition = streamBufferLength;
+// target position is outside the stream buffer,
+// need to reset stream buffer and move position for next network read
+resetStreamBuffer();
+streamPosition = pos;
   }
+} else {
+  streamPosition = pos;
 }
 
-streamPosition = pos;
 // close BlobInputStream after seek is invoked because BlobInputStream
 // does not support seek
 closeBlobInputStream();
@@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
 streamBuffer = new byte[(int) 

[21/51] [abbrv] hadoop git commit: HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek. Contributed by Thomas Marquardt

2017-08-09 Thread stevel
HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek.
Contributed by Thomas Marquardt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91b7a84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91b7a84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91b7a84

Branch: refs/heads/HADOOP-13345
Commit: d91b7a8451489f97bdde928cea774764155cfe03
Parents: 024c3ec
Author: Steve Loughran 
Authored: Sun Aug 6 20:19:23 2017 +0100
Committer: Steve Loughran 
Committed: Sun Aug 6 20:19:23 2017 +0100

--
 .../hadoop/fs/azure/BlockBlobInputStream.java   | 91 +++-
 .../fs/azure/TestBlockBlobInputStream.java  | 85 --
 2 files changed, 150 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
--
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
index 5542415..c37b2be 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
@@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   private InputStream blobInputStream = null;
   private int minimumReadSizeInBytes = 0;
   private long streamPositionAfterLastRead = -1;
+  // position of next network read within stream
   private long streamPosition = 0;
+  // length of stream
   private long streamLength = 0;
   private boolean closed = false;
+  // internal buffer, re-used for performance optimization
   private byte[] streamBuffer;
+  // zero-based offset within streamBuffer of current read position
   private int streamBufferPosition;
+  // length of data written to streamBuffer, streamBuffer may be larger
   private int streamBufferLength;
 
   /**
@@ -82,6 +87,16 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   }
 
   /**
+   * Reset the internal stream buffer but do not release the memory.
+   * The buffer can be reused to avoid frequent memory allocations of
+   * a large buffer.
+   */
+  private void resetStreamBuffer() {
+streamBufferPosition = 0;
+streamBufferLength = 0;
+  }
+
+  /**
* Gets the read position of the stream.
* @return the zero-based byte offset of the read position.
* @throws IOException IO failure
@@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   @Override
   public synchronized long getPos() throws IOException {
 checkState();
-return streamPosition;
+return (streamBuffer != null)
+? streamPosition - streamBufferLength + streamBufferPosition
+: streamPosition;
   }
 
   /**
@@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   throw new EOFException(
   FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
 }
-if (pos == getPos()) {
+
+// calculate offset between the target and current position in the stream
+long offset = pos - getPos();
+
+if (offset == 0) {
   // no=op, no state change
   return;
 }
 
+if (offset > 0) {
+  // forward seek, data can be skipped as an optimization
+  if (skip(offset) != offset) {
+throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+  }
+  return;
+}
+
+// reverse seek, offset is negative
 if (streamBuffer != null) {
-  long offset = streamPosition - pos;
-  if (offset > 0 && offset < streamBufferLength) {
-streamBufferPosition = streamBufferLength - (int) offset;
+  if (streamBufferPosition + offset >= 0) {
+// target position is inside the stream buffer,
+// only need to move backwards within the stream buffer
+streamBufferPosition += offset;
   } else {
-streamBufferPosition = streamBufferLength;
+// target position is outside the stream buffer,
+// need to reset stream buffer and move position for next network read
+resetStreamBuffer();
+streamPosition = pos;
   }
+} else {
+  streamPosition = pos;
 }
 
-streamPosition = pos;
 // close BlobInputStream after seek is invoked because BlobInputStream
 // does not support seek
 closeBlobInputStream();
@@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
 streamBuffer = new byte[(int)