Repository: hadoop Updated Branches: refs/heads/branch-2 2cbac36fd -> 701b96ca8 refs/heads/trunk f0412de1c -> 826267f78
HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of the object from S3. (Dan Hecht via stevel). Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/701b96ca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/701b96ca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/701b96ca Branch: refs/heads/branch-2 Commit: 701b96ca8e9a89d51ee47a470e524307fea3a035 Parents: 2cbac36 Author: Steve Loughran <ste...@apache.org> Authored: Tue Feb 17 16:36:32 2015 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Tue Feb 17 16:36:32 2015 +0000 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 20 ++++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/701b96ca/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4871f45..a8b38ed 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -542,6 +542,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb) + HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of + the object from S3. (Dan Hecht via stevel). + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/701b96ca/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 4c56b82..685026e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -37,14 +37,13 @@ public class S3AInputStream extends FSInputStream { private long pos; private boolean closed; private S3ObjectInputStream wrappedStream; - private S3Object wrappedObject; private FileSystem.Statistics stats; private AmazonS3Client client; private String bucket; private String key; private long contentLength; public static final Logger LOG = S3AFileSystem.LOG; - + public static final long CLOSE_THRESHOLD = 4096; public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client, FileSystem.Statistics stats) { @@ -55,12 +54,11 @@ public class S3AInputStream extends FSInputStream { this.stats = stats; this.pos = 0; this.closed = false; - this.wrappedObject = null; this.wrappedStream = null; } private void openIfNeeded() throws IOException { - if (wrappedObject == null) { + if (wrappedStream == null) { reopen(0); } } @@ -90,8 +88,7 @@ public class S3AInputStream extends FSInputStream { GetObjectRequest request = new GetObjectRequest(bucket, key); request.setRange(pos, contentLength-1); - wrappedObject = client.getObject(request); - wrappedStream = wrappedObject.getObjectContent(); + wrappedStream = client.getObject(request).getObjectContent(); if (wrappedStream == null) { throw new IOException("Null IO stream"); @@ -192,8 +189,15 @@ public class S3AInputStream extends FSInputStream { public synchronized void close() throws IOException { super.close(); closed = true; - if (wrappedObject != null) { - wrappedObject.close(); + if (wrappedStream != null) { + if (contentLength - pos <= CLOSE_THRESHOLD) { + // Close, rather than abort, so that the http connection can be reused. + wrappedStream.close(); + } else { + // Abort, rather than just close, the underlying stream. Otherwise, the + // remaining object payload is read from S3 while closing the stream. + wrappedStream.abort(); + } } }