This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9c07822 [FLINK-11187] [s3] Use file over stream for writes 9c07822 is described below commit 9c07822acedfd7e0582481de620e5af205f4aef3 Author: Addison Higham <ahig...@instructure.com> AuthorDate: Tue Jan 8 15:32:28 2019 -0700 [FLINK-11187] [s3] Use file over stream for writes This changes the S3AccessHelper API to take a file instead of an input stream. This allows s3 client to properly reset a file instead of a file over stream for writes. This fixes an issue where the underlying s3 implementation has an intermittent failure, tries to reset the stream, fails to do so, and results in hung requests with delayed errors. --- .../fs/s3/common/utils/RefCountedBufferingFileStream.java | 7 ++----- .../flink/fs/s3/common/utils/RefCountedFSOutputStream.java | 8 ++++---- .../s3/common/writer/RecoverableMultiPartUploadImpl.java | 9 ++++----- .../apache/flink/fs/s3/common/writer/S3AccessHelper.java | 12 +++++------- .../s3/common/utils/RefCountedBufferingFileStreamTest.java | 3 ++- .../common/writer/RecoverableMultiPartUploadImplTest.java | 14 +++++++------- .../common/writer/S3RecoverableFsDataOutputStreamTest.java | 9 ++++----- .../org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java | 9 ++++----- 8 files changed, 32 insertions(+), 39 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java index 8f3aff8..29f2590 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java @@ -24,9 +24,6 @@ import org.apache.flink.util.function.FunctionWithException; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,8 +61,8 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { } @Override - public InputStream getInputStream() throws IOException { - return Files.newInputStream(currentTmpFile.getFile().toPath(), StandardOpenOption.READ); + public File getInputFile() { + return currentTmpFile.getFile(); } @Override diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java index d4b962e..d51e37e 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java @@ -21,8 +21,8 @@ package org.apache.flink.fs.s3.common.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.FSDataOutputStream; +import java.io.File; import java.io.IOException; -import java.io.InputStream; /** * A {@link FSDataOutputStream} with the {@link RefCounted} functionality. @@ -31,11 +31,11 @@ import java.io.InputStream; public abstract class RefCountedFSOutputStream extends FSDataOutputStream implements RefCounted { /** - * Gets an {@link InputStream} that allows to read the contents of the file. + * Gets the underlying {@link File} that allows to read the contents of the file. * - * @return An input stream to the contents of the file. + * @return A handle to the File object. */ - public abstract InputStream getInputStream() throws IOException; + public abstract File getInputFile(); /** * Checks if the file is closed for writes. diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java index 9d88e65..0d0998a 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java @@ -30,7 +30,6 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -173,8 +172,8 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload // first, upload the trailing data file. during that time, other in-progress uploads may complete. final String incompletePartObjectName = createIncompletePartObjectName(); file.retain(); - try (InputStream inputStream = file.getInputStream()) { - s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos()); + try { + s3AccessHelper.putObject(incompletePartObjectName, file.getInputFile()); } finally { file.release(); @@ -315,8 +314,8 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload @Override public void run() { - try (final InputStream inputStream = file.getInputStream()) { - final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos()); + try { + final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos()); future.complete(new PartETag(result.getPartNumber(), result.getETag())); file.release(); } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java index bcdea3c..593d9d3 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java @@ -28,7 +28,6 @@ import com.amazonaws.services.s3.model.UploadPartResult; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -59,25 +58,24 @@ public interface S3AccessHelper { * @param key the key this MPU is associated with. * @param uploadId the id of the MPU. * @param partNumber the number of the part being uploaded (has to be in [1 ... 10000]). - * @param file the (local) file holding the part to be uploaded. + * @param inputFile the (local) file holding the part to be uploaded. * @param length the length of the part. * @return The {@link UploadPartResult result} of the attempt to upload the part. * @throws IOException */ - UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException; + UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException; /** - * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method, + * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, File, long)} method, * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection * policies specified for your S3 bucket. * * @param key the key used to identify this part. - * @param file the (local) file holding the data to be uploaded. - * @param length the size of the data to be uploaded. + * @param inputFile the (local) file holding the data to be uploaded. * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part. * @throws IOException */ - PutObjectResult putObject(String key, InputStream file, long length) throws IOException; + PutObjectResult putObject(String key, File inputFile) throws IOException; /** * Finalizes a Multi-Part Upload. diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java index 5b7d1cc..50ea9bd 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; @@ -121,7 +122,7 @@ public class RefCountedBufferingFileStreamTest { Assert.assertEquals(contentToWrite.length, stream.getPos()); final byte[] contentRead = new byte[contentToWrite.length]; - stream.getInputStream().read(contentRead, 0, contentRead.length); + new FileInputStream(stream.getInputFile()).read(contentRead, 0, contentRead.length); Assert.assertTrue(Arrays.equals(contentToWrite, contentRead)); stream.release(); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index 673796d..0194065 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -35,8 +35,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -361,14 +361,14 @@ public class RecoverableMultiPartUploadImplTest { } @Override - public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException { - final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length)); + public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException { + final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(length)); return storeAndGetUploadPartResult(key, partNumber, content); } @Override - public PutObjectResult putObject(String key, InputStream file, long length) throws IOException { - final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length)); + public PutObjectResult putObject(String key, File inputFile) throws IOException { + final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(inputFile.length())); return storeAndGetPutObjectResult(key, content); } @@ -397,9 +397,9 @@ public class RecoverableMultiPartUploadImplTest { return null; } - private byte[] getFileContentBytes(InputStream file, int length) throws IOException { + private byte[] getFileContentBytes(File file, int length) throws IOException { final byte[] content = new byte[length]; - file.read(content, 0, length); + new FileInputStream(file).read(content, 0, length); return content; } diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java index 7a32392..14ed2e2 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java @@ -39,8 +39,8 @@ import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.FileAlreadyExistsException; @@ -314,10 +314,9 @@ public class S3RecoverableFsDataOutputStreamTest { private static byte[] readFileContents(RefCountedFSOutputStream file) throws IOException { final byte[] content = new byte[MathUtils.checkedDownCast(file.getPos())]; - try (InputStream inputStream = file.getInputStream()) { - int bytesRead = inputStream.read(content, 0, content.length); // TODO: 10/2/18 see if closed in download - Assert.assertEquals(file.getPos(), bytesRead); - } + File inputFile = file.getInputFile(); + long bytesRead = new FileInputStream(inputFile).read(content, 0, MathUtils.checkedDownCast(inputFile.length())); + Assert.assertEquals(file.getPos(), bytesRead); return content; } diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java index b9612ad..4d37ce0 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java @@ -38,7 +38,6 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -68,15 +67,15 @@ public class HadoopS3AccessHelper implements S3AccessHelper { } @Override - public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException { + public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException { final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest( - key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L); + key, uploadId, partNumber, MathUtils.checkedDownCast(length), null, inputFile, 0L); return s3accessHelper.uploadPart(uploadRequest); } @Override - public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException { - final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length); + public PutObjectResult putObject(String key, File inputFile) throws IOException { + final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputFile); return s3accessHelper.putObject(putRequest); }