This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 7a4b3d4 HADOOP-15870. S3AInputStream.remainingInFile should use nextReadPos. 7a4b3d4 is described below commit 7a4b3d42c4e36e468c2a46fd48036a6fed547853 Author: lqjacklee <lqjack...@126.com> AuthorDate: Thu Oct 10 21:58:42 2019 +0100 HADOOP-15870. S3AInputStream.remainingInFile should use nextReadPos. Contributed by lqjacklee. Change-Id: I32bb00a683102e7ff8ff8ce0b8d9c3195ca7381c --- .../site/markdown/filesystem/fsdatainputstream.md | 53 +++++++++++++++ .../fs/contract/AbstractContractSeekTest.java | 75 ++++++++++++++++++++-- .../org/apache/hadoop/fs/s3a/S3AInputStream.java | 29 ++++++--- .../fs/contract/s3a/ITestS3AContractSeek.java | 2 +- 4 files changed, 143 insertions(+), 16 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 0906964..b8f9e87 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -119,6 +119,59 @@ Return the data at the current position. else result = -1 +### <a name="InputStream.available"></a> `InputStream.available()` + +Returns the number of bytes "estimated" to be readable on a stream before `read()` +blocks on any IO (i.e. the thread is potentially suspended for some time). + +That is: for all values `v` returned by `available()`, `read(buffer, 0, v)` +is should not block. + +#### Postconditions + +```python +if len(data) == 0: + result = 0 + +elif pos >= len(data): + result = 0 + +else: + d = "the amount of data known to be already buffered/cached locally" + result = min(1, d) # optional but recommended: see below. +``` + +As `0` is a number which is always meets this condition, it is nominally +possible for an implementation to simply return `0`. However, this is not +considered useful, and some applications/libraries expect a positive number. + +#### The GZip problem. + +[JDK-7036144](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144), +"GZIPInputStream readTrailer uses faulty available() test for end-of-stream" +discusses how the JDK's GZip code it uses `available()` to detect an EOF, +in a loop similar to the the following + +```java +while(instream.available()) { + process(instream.read()); +} +``` + +The correct loop would have been: + +```java +int r; +while((r=instream.read()) >= 0) { + process(r); +} +``` + +If `available()` ever returns 0, then the gzip loop halts prematurely. + +For this reason, implementations *should* return a value >=1, even +if it breaks that requirement of `available()` returning the amount guaranteed +not to block on reads. ### <a name="InputStream.read.buffer[]"></a> `InputStream.read(buffer[], offset, length)` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index ca8e4a0..db36916 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -32,6 +32,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.Random; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; @@ -99,14 +100,18 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas describe("seek and read a 0 byte file"); instream = getFileSystem().open(zeroByteFile); assertEquals(0, instream.getPos()); + assertAvailableIsZero(instream); //expect initial read to fai; int result = instream.read(); assertMinusOne("initial byte read", result); + assertAvailableIsZero(instream); byte[] buffer = new byte[1]; //expect that seek to 0 works instream.seek(0); + assertAvailableIsZero(instream); //reread, expect same exception result = instream.read(); + assertAvailableIsZero(instream); assertMinusOne("post-seek byte read", result); result = instream.read(buffer, 0, 1); assertMinusOne("post-seek buffer read", result); @@ -132,8 +137,8 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas @Test public void testSeekReadClosedFile() throws Throwable { instream = getFileSystem().open(smallSeekFile); - getLogger().debug( - "Stream is of type " + instream.getClass().getCanonicalName()); + getLogger().debug("Stream is of type {}", + instream.getClass().getCanonicalName()); instream.close(); try { instream.seek(0); @@ -168,10 +173,26 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas try { long offset = instream.getPos(); } catch (IOException e) { - // its valid to raise error here; but the test is applied to make + // it is valid to raise error here; but the test is applied to make // sure there's no other exception like an NPE. } + // a closed stream should either fail or return 0 bytes. + try { + int a = instream.available(); + LOG.info("available() returns a value on a closed file: {}", a); + assertAvailableIsZero(instream); + } catch (IOException | IllegalStateException expected) { + // expected + } + // a closed stream should either fail or return 0 bytes. + try { + int a = instream.available(); + LOG.info("available() returns a value on a closed file: {}", a); + assertAvailableIsZero(instream); + } catch (IOException | IllegalStateException expected) { + // expected + } //and close again instream.close(); } @@ -205,6 +226,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas //expect that seek to 0 works instream.seek(0); int result = instream.read(); + assertAvailableIsPositive(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.getPos()); @@ -226,13 +248,24 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas //go just before the end instream.seek(TEST_FILE_LEN - 2); assertTrue("Premature EOF", instream.read() != -1); + assertAvailableIsPositive(instream); assertTrue("Premature EOF", instream.read() != -1); + checkAvailabilityAtEOF(); assertMinusOne("read past end of file", instream.read()); } + /** + * This can be overridden if a filesystem always returns 01 + * @throws IOException + */ + protected void checkAvailabilityAtEOF() throws IOException { + assertAvailableIsZero(instream); + } + @Test public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { - describe("do a seek past the EOF, then verify the stream recovers"); + describe("do a seek past the EOF, " + + "then verify the stream recovers"); instream = getFileSystem().open(smallSeekFile); //go just before the end. This may or may not fail; it may be delayed until the //read @@ -261,6 +294,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas //now go back and try to read from a valid point in the file instream.seek(1); assertTrue("Premature EOF", instream.read() != -1); + assertAvailableIsPositive(instream); } /** @@ -278,6 +312,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas //expect that seek to 0 works instream.seek(0); int result = instream.read(); + assertAvailableIsPositive(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.read()); @@ -296,6 +331,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas instream.seek(0); assertEquals(0, instream.getPos()); instream.read(); + assertAvailableIsPositive(instream); assertEquals(1, instream.getPos()); byte[] buf = new byte[80 * 1024]; instream.readFully(1, buf, 0, buf.length); @@ -314,7 +350,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas instream.seek(39999); assertTrue(-1 != instream.read()); assertEquals(40000, instream.getPos()); - + assertAvailableIsPositive(instream); int v = 256; byte[] readBuffer = new byte[v]; assertEquals(v, instream.read(128, readBuffer, 0, v)); @@ -322,6 +358,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas assertEquals(40000, instream.getPos()); //content is the same too assertEquals("@40000", block[40000], (byte) instream.read()); + assertAvailableIsPositive(instream); //now verify the picked up data for (int i = 0; i < 256; i++) { assertEquals("@" + i, block[i + 128], readBuffer[i]); @@ -376,6 +413,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas assertEquals(0, instream.getPos()); byte[] buffer = new byte[1]; instream.readFully(0, buffer, 0, 0); + assertAvailableIsZero(instream); assertEquals(0, instream.getPos()); // seek to 0 read 0 bytes from it instream.seek(0); @@ -551,7 +589,9 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas fail("Expected an exception, got " + r); } catch (EOFException e) { handleExpectedException(e); - } catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) { + } catch (IOException + | IllegalArgumentException + | IndexOutOfBoundsException e) { handleRelaxedException("read() with a negative position ", "EOFException", e); @@ -587,6 +627,29 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas instream = getFileSystem().open(smallSeekFile); instream.seek(TEST_FILE_LEN -1); assertTrue("read at last byte", instream.read() > 0); + assertAvailableIsZero(instream); assertEquals("read just past EOF", -1, instream.read()); } + + /** + * Assert that the number of bytes available is zero. + * @param in input stream + */ + protected static void assertAvailableIsZero(FSDataInputStream in) + throws IOException { + assertEquals("stream.available() should be zero", + 0, in.available()); + } + + /** + * Assert that the number of bytes available is greater than zero. + * @param in input stream + */ + protected static void assertAvailableIsPositive(FSDataInputStream in) + throws IOException { + int available = in.available(); + assertTrue("stream.available() should be positive but is " + + available, + available > 0); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c92a85e..8aac868 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -218,7 +218,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, } @Override - public synchronized long getPos() throws IOException { + public synchronized long getPos() { return (nextReadPos < 0) ? 0 : nextReadPos; } @@ -620,15 +620,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, return isObjectStreamOpen(); } + /** + * Return the number of bytes available. + * If the inner stream is closed, the value is 1 for consistency + * with S3ObjectStream -and so address the GZip bug + * http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 . + * If the stream is open, then it is the amount returned by the + * wrapped stream. + * @return a value greater than or equal to zero. + * @throws IOException IO failure. + */ @Override public synchronized int available() throws IOException { checkNotClosed(); - - long remaining = remainingInFile(); - if (remaining > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; + if (contentLength == 0 || (nextReadPos >= contentLength)) { + return 0; } - return (int)remaining; + + return wrappedStream == null + ? 1 + : wrappedStream.available(); } /** @@ -637,8 +648,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ @InterfaceAudience.Private @InterfaceStability.Unstable - public synchronized long remainingInFile() { - return this.contentLength - this.pos; + public synchronized long remainingInFile() throws IOException { + return contentLength - getPos(); } /** @@ -649,7 +660,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, @InterfaceAudience.Private @InterfaceStability.Unstable public synchronized long remainingInCurrentRequest() { - return this.contentRangeFinish - this.pos; + return contentRangeFinish - getPos(); } @InterfaceAudience.Private diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java index 9332621..3513d01 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java @@ -80,7 +80,7 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest { * which S3A Supports. * @return a list of seek policies to test. */ - @Parameterized.Parameters + @Parameterized.Parameters(name = "{0}-{1}") public static Collection<Object[]> params() { return Arrays.asList(new Object[][]{ {INPUT_FADV_SEQUENTIAL, Default_JSSE}, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org