>From Murtadha Hubail <[email protected]>: Murtadha Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18207 )
Change subject: [ASTERIXDB-3368][EXT] Abort S3 streams before closing ...................................................................... [ASTERIXDB-3368][EXT] Abort S3 streams before closing - user model changes: no - storage format changes: no - interface changes: no Details: - Abort the S3 stream to avoid fully consuming the stream in cases where we would like to close the stream early. Change-Id: I5e85ab19734f417e6a38b522db5298534951687e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18207 Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Michael Blow <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java 1 file changed, 29 insertions(+), 1 deletion(-) Approvals: Michael Blow: Looks good to me, approved; Verified Jenkins: Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index f14af53..6386813 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -38,6 +38,7 @@ import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.util.LogRedactionUtil; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -49,6 +50,7 @@ // Configuration private final String bucket; private final S3Client s3Client; + private ResponseInputStream<?> s3InStream; private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { @@ -83,7 +85,8 @@ int retries = 0; while (retries < MAX_RETRIES) { try { - in = s3Client.getObject(request); + s3InStream = s3Client.getObject(request); + in = s3InStream; break; } catch (NoSuchKeyException ex) { LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket " @@ -115,6 +118,9 @@ @Override public void close() throws IOException { if (in != null) { + if (s3InStream != null) { + s3InStream.abort(); + } CleanupUtils.close(in, null); } if (s3Client != null) { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18207 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: I5e85ab19734f417e6a38b522db5298534951687e Gerrit-Change-Number: 18207 Gerrit-PatchSet: 2 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
