Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 merged PR #18891: URL: https://github.com/apache/druid/pull/18891 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2916051576
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/ITS3ToS3ParallelIndexTest.java:
##
@@ -40,7 +39,11 @@ public void testS3IndexData(Pair>
s3InputSource) throws Exceptio
@MethodSource("resources")
public void testS3IndexData_withTempCredentials(Pair>
s3InputSource) throws Exception
{
-final S3InputSourceConfig inputSourceConfig =
minIOStorageResource.createTempCredentialsForInputSource();
-doTest(s3InputSource, new Pair<>(false, false), "s3", inputSourceConfig);
+doTestWithEndpointConfig(
Review Comment:
No worries! Thanks for the clarification!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2916015177
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/ITS3ToS3ParallelIndexTest.java:
##
@@ -40,7 +39,11 @@ public void testS3IndexData(Pair>
s3InputSource) throws Exceptio
@MethodSource("resources")
public void testS3IndexData_withTempCredentials(Pair>
s3InputSource) throws Exception
{
-final S3InputSourceConfig inputSourceConfig =
minIOStorageResource.createTempCredentialsForInputSource();
-doTest(s3InputSource, new Pair<>(false, false), "s3", inputSourceConfig);
+doTestWithEndpointConfig(
Review Comment:
Oops – missed this one. There's a bunch of changes in this PR so been a bit
annoying to keep track of them.
> We should probably add a separate test method instead of updating this one.
The original test would break w/o this update. The reason is SDK v2 requires
some ARN and MinIO will basically just check whether the ARN is valid or not
(which includes needing a signing URL).
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/ITS3ToS3ParallelIndexTest.java:
##
@@ -40,7 +39,11 @@ public void testS3IndexData(Pair>
s3InputSource) throws Exceptio
@MethodSource("resources")
public void testS3IndexData_withTempCredentials(Pair>
s3InputSource) throws Exception
{
-final S3InputSourceConfig inputSourceConfig =
minIOStorageResource.createTempCredentialsForInputSource();
-doTest(s3InputSource, new Pair<>(false, false), "s3", inputSourceConfig);
+doTestWithEndpointConfig(
Review Comment:
Oops – missed this one. There's a bunch of changes in this PR so been a bit
annoying to keep track of them, apologies.
> We should probably add a separate test method instead of updating this one.
The original test would break w/o this update. The reason is SDK v2 requires
some ARN and MinIO will basically just check whether the ARN is valid or not
(which includes needing a signing URL).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
github-advanced-security[bot] commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2916020539
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##
@@ -328,23 +371,41 @@
public static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
- List keysToDelete,
+ List keysToDelete,
int retries
)
throws Exception
{
if (keysToDelete != null && log.isDebugEnabled()) {
List keys = keysToDelete.stream()
-
.map(DeleteObjectsRequest.KeyVersion::getKey)
+ .map(ObjectIdentifier::key)
.collect(Collectors.toList());
log.debug("Deleting keys from bucket: [%s], keys: [%s]", bucket, keys);
}
-DeleteObjectsRequest deleteRequest = new
DeleteObjectsRequest(bucket).withKeys(keysToDelete);
-S3Utils.retryS3Operation(() -> {
- s3Client.deleteObjects(deleteRequest);
- return null;
-}, retries);
-log.info("Deleted %d files", keysToDelete.size());
+List remaining = keysToDelete;
+List lastErrors = null;
+for (int attempt = 0; attempt <= retries; attempt++) {
+ DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder()
+ .bucket(bucket)
+ .delete(Delete.builder().objects(remaining).build())
+ .build();
+ DeleteObjectsResponse response = retryS3Operation(() ->
s3Client.deleteObjects(deleteRequest));
+ if (!response.hasErrors()) {
+log.info("Deleted %d files", keysToDelete.size());
Review Comment:
## Dereferenced variable may be null
Variable [keysToDelete](1) may be null at this access as suggested by
[this](2) null guard.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10875)
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##
@@ -147,65 +153,74 @@
private boolean deleteKeysForBucket(
ServerSideEncryptingAmazonS3 s3Client,
String s3Bucket,
- List keysToDelete
+ List keysToDelete
)
{
boolean hadException = false;
-DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
-deleteObjectsRequest.setQuiet(true);
-List> keysChunks = Lists.partition(
+List> keysChunks = Lists.partition(
keysToDelete,
MAX_MULTI_OBJECT_DELETE_SIZE
);
-for (List chunkOfKeys : keysChunks) {
- List keysToDeleteStrings = chunkOfKeys.stream().map(
-
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+for (List chunkOfKeys : keysChunks) {
try {
-deleteObjectsRequest.setKeys(chunkOfKeys);
log.info(
"Deleting the following segment files from S3 bucket[%s]: [%s]",
s3Bucket,
-keysToDeleteStrings
-);
-S3Utils.retryS3Operation(
-() -> {
- s3Client.deleteObjects(deleteObjectsRequest);
- return null;
-},
-3
+
chunkOfKeys.stream().map(ObjectIdentifier::key).collect(Collectors.toList())
);
- }
- catch (MultiObjectDeleteException e) {
-hadException = true;
-Map> errorToKeys = new HashMap<>();
-for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
- errorToKeys.computeIfAbsent(StringUtils.format(
- MULTI_OBJECT_DELETE_EXEPTION_ERROR_FORMAT,
- error.getMessage(),
- error.getCode()
- ), k -> new ArrayList<>()).add(error.getKey());
+List remaining = chunkOfKeys;
+for (int attempt = 0; attempt <= NUM_RETRIES; attempt++) {
Review Comment:
## Useless comparison test
Test is always true, because of [this condition](1).
Test is always true, because of [this condition](2).
[Show more
details](https://github.com/apache/druid/security/code-scanning/10876)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2915989699
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/ITS3ToS3ParallelIndexTest.java:
##
@@ -40,7 +39,11 @@ public void testS3IndexData(Pair>
s3InputSource) throws Exceptio
@MethodSource("resources")
public void testS3IndexData_withTempCredentials(Pair>
s3InputSource) throws Exception
{
-final S3InputSourceConfig inputSourceConfig =
minIOStorageResource.createTempCredentialsForInputSource();
-doTest(s3InputSource, new Pair<>(false, false), "s3", inputSourceConfig);
+doTestWithEndpointConfig(
Review Comment:
Please avoid resolving comments if they are not being addressed. 🙂
It makes it difficult for the reviewer to figure out why a change should not
be made.
In such cases, it is best to add a reply clarifying why the change is not
needed and leave the comment unresolved to keep the discussion open.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2915915008
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##
@@ -94,32 +97,34 @@ public void kill(List segments) throws
SegmentLoadingException
final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
// create a map of bucket to keys to delete
-Map> bucketToKeysToDelete =
new HashMap<>();
+Map> bucketToKeysToDelete = new HashMap<>();
for (DataSegment segment : segments) {
String s3Bucket = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.BUCKET);
String path = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.KEY);
- List keysToDelete =
bucketToKeysToDelete.computeIfAbsent(
+ List keysToDelete =
bucketToKeysToDelete.computeIfAbsent(
s3Bucket,
k -> new ArrayList<>()
);
if (path.endsWith("/")) {
// segment is not compressed, list objects and add them all to delete
list
-final ListObjectsV2Result list = s3Client.listObjectsV2(
-new
ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(path)
-);
-for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
- keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(objectSummary.getKey()));
+ListObjectsV2Request request = ListObjectsV2Request.builder()
Review Comment:
Nit: The `.builder()` in several of the files will lead to unnecessary
reformats later. Please check if they can be moved to the next line in all such
places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-4033955804 > The latest updates look good to me as far as resolving the concerns from the limited pass I did-- thanks @jtuglu1! I am stopping short of doing a full ✅ because I only did that limited pass. The full review I am leaving to @kfaraz. Thanks. I addressed Kashif's comments as well. I just need to get docker tests to pass then I will merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
gianm commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-4033941769 The latest updates look good to me as far as resolving the concerns from the limited pass I did-- thanks @jtuglu1! I am stopping short of doing a full ✅ because I only did that limited pass. The full review I am leaving to @kfaraz. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
gianm commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2912914607
##
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java:
##
@@ -383,22 +392,26 @@ public void
test_kill_listOfSegments_multiDeleteExceptionIsThrown()
@Test
public void
test_kill_listOfSegments_multiDeleteExceptionIsThrownMultipleTimes()
{
-DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(TEST_BUCKET);
Review Comment:
Same issue as `test_kill_listOfSegments_multiDeleteExceptionIsThrown`. We
should use a regular `DeleteObjectsResponse` with errors set.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##
@@ -147,65 +152,69 @@ public void kill(List segments) throws
SegmentLoadingException
private boolean deleteKeysForBucket(
ServerSideEncryptingAmazonS3 s3Client,
String s3Bucket,
- List keysToDelete
+ List keysToDelete
)
{
boolean hadException = false;
-DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
-deleteObjectsRequest.setQuiet(true);
-List> keysChunks = Lists.partition(
+List> keysChunks = Lists.partition(
keysToDelete,
MAX_MULTI_OBJECT_DELETE_SIZE
);
-for (List chunkOfKeys : keysChunks) {
+for (List chunkOfKeys : keysChunks) {
List keysToDeleteStrings = chunkOfKeys.stream().map(
-
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+ ObjectIdentifier::key).collect(Collectors.toList());
try {
-deleteObjectsRequest.setKeys(chunkOfKeys);
+DeleteObjectsRequest deleteObjectsRequest =
DeleteObjectsRequest.builder()
+.bucket(s3Bucket)
+.delete(Delete.builder()
+.objects(chunkOfKeys)
+.quiet(true)
+.build())
+.build();
log.info(
"Deleting the following segment files from S3 bucket[%s]: [%s]",
s3Bucket,
keysToDeleteStrings
);
S3Utils.retryS3Operation(
() -> {
- s3Client.deleteObjects(deleteObjectsRequest);
+ DeleteObjectsResponse response =
s3Client.deleteObjects(deleteObjectsRequest);
+ // Check for errors in the response
+ if (response.hasErrors()) {
Review Comment:
The old code for `deleteKeysForBucket`, if there was an error in a
multi-delete, would set `hadException = true` and then return `true` (meaning
there was an issue deleting keys). That would then cause the caller (`kill`) to
throw a `SegmentLoadingException`. Now, the exception is suppressed and `kill`
will treat it as a success. Please restore the old behavior.
There's a retry issue too. The old code would sometimes retry
`MultiObjectDeleteException` (see my comment in `S3Utils`) due to some logic
for retrying it in `retryS3Operation`. The new code does not retry, just
silently treats it as a success (after logging some warnings).
It may be best to focus on fixing `S3Utils.deleteBucketKeys` and then use
that here. Then we only need to get the error/retry logic right in one place.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##
@@ -328,18 +418,21 @@ public static void deleteObjectsInPath(
public static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
- List keysToDelete,
+ List keysToDelete,
int retries
)
throws Exception
{
if (keysToDelete != null && log.isDebugEnabled()) {
List keys = keysToDelete.stream()
-
.map(DeleteObjectsRequest.KeyVersion::getKey)
+ .map(ObjectIdentifier::key)
.collect(Collectors.toList());
log.debug("Deleting keys from bucket: [%s], keys: [%s]", bucket, keys);
}
-DeleteObjectsRequest deleteRequest = new
DeleteObjectsRequest(bucket).withKeys(keysToDelete);
+DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder()
+.bucket(bucket)
+.delete(Delete.builder().objects(keysToDelete).build())
+.build();
S3Utils.retryS3Operation(() -> {
s3Client.deleteObjects(deleteRequest);
Review Comment:
Need to check `hasErrors` in the response. Related: the old `S3RETRY`
checked for `MultiObjectDeleteException` and would retry in certain cases.
That's lost now, please restore it. The best way I can think of to do that is
to introduce our own version of `MultiObjectDeleteException` and throw it when
`hasErrors` is true, and then add our version to `S3RETRY`.
Please add a test for this scenario too.
For extra credit only retry the objects that failed to be deleted!
##
extensions-core/
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891: URL: https://github.com/apache/druid/pull/18891#discussion_r2863249315 ## pom.xml: ## @@ -119,7 +119,8 @@ 3.3.6 22.3.5 5.14.2 -1.12.784 +1.12.784 Review Comment: Please add a comment on where v1 is still used and if there is a plan to eventually get rid of it. I guess it is only used with Hadoop, which we should be able to remove soon in #19109 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2863249315
##
pom.xml:
##
@@ -119,7 +119,8 @@
3.3.6
22.3.5
5.14.2
-1.12.784
+1.12.784
Review Comment:
Please add a comment on where v1 is still used and if there is a plan to
eventually get rid of it.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##
@@ -229,11 +288,42 @@ static String constructSegmentBasePath(String baseKey,
String storageDir)
) + "/";
}
- static AccessControlList
grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3 s3Client, String
bucket)
+ static Grant grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3
s3Client, String bucket)
{
-final AccessControlList acl = s3Client.getBucketAcl(bucket);
-acl.grantAllPermissions(new Grant(new
CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
-return acl;
+final String ownerId = s3Client.getBucketAcl(bucket).owner().id();
+return Grant.builder()
+.grantee(Grantee.builder()
+.type(Type.CANONICAL_USER)
+.id(ownerId)
+.build())
Review Comment:
nit: formatting here
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java:
##
@@ -84,6 +84,7 @@
*
* This tests both InputFormat and Parser. Parser is deprecated for Streaming
Ingestion,
* and those tests will be removed in the future.
+ *
Review Comment:
Nit: extra newline
##
cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java:
##
@@ -56,6 +72,21 @@ public boolean isForceGlobalBucketAccessEnabled()
return forceGlobalBucketAccessEnabled;
}
+ public int getConnectionTimeout()
+ {
+return connectionTimeout;
+ }
+
+ public int getSocketTimeout()
Review Comment:
Maybe rename these to `getXyzTimeoutMillis()` to avoid ambiguity.
Other millis-related configs typically use the type `long` but I guess these
fields will never actually need to be that large.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##
@@ -128,16 +129,18 @@ private Optional streamTaskFile(final long
offset, String taskKey)
start = contentLength + offset;
}
- final GetObjectRequest request = new
GetObjectRequest(config.getS3Bucket(), taskKey)
- .withMatchingETagConstraint(ensureQuotated(objectMetadata.getETag()))
- .withRange(start, end);
+ GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
+ .bucket(config.getS3Bucket())
+ .key(taskKey)
+ .ifMatch(ensureQuotated(objectMetadata.eTag()))
+ .range(StringUtils.format("bytes=%d-%d", start, end));
Review Comment:
For the range stuff, I think it would be useful to write an `AwsBytesRange`
utility class that takes optional start, end as inputs and exposes a method
`String getBytesRange()`. That would help avoid doing the String formatting in
multiple places and keep the code less error prone.
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/ITS3ToS3ParallelIndexTest.java:
##
@@ -40,7 +39,11 @@ public void testS3IndexData(Pair>
s3InputSource) throws Exceptio
@MethodSource("resources")
public void testS3IndexData_withTempCredentials(Pair>
s3InputSource) throws Exception
{
-final S3InputSourceConfig inputSourceConfig =
minIOStorageResource.createTempCredentialsForInputSource();
-doTest(s3InputSource, new Pair<>(false, false), "s3", inputSourceConfig);
+doTestWithEndpointConfig(
Review Comment:
We should probably add a separate test method instead of updating this one.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##
@@ -65,6 +64,42 @@ public class S3Utils
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final Logger log = new Logger(S3Utils.class);
+ /**
+ * A holder for S3Object with its associated bucket name.
+ * In AWS SDK v2, S3Object doesn't include the bucket name, so we need to
track it separately.
+ */
+ public static class S3ObjectWithBucket
Review Comment:
This should be in a separate file and not inside the `S3Utils` class.
##
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java:
##
@@ -616,132 +559,49 @@ public void testSeekUnassigned() throws
InterruptedException
recordSupplier.seekToEarliest(Collections.singleton(shard0));
}
-
- @Test
- public void testPollAfterSeek()
Review Comment:
Is this test not applicable anymore with v2?
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java:
##
@@ -
Re: [PR] Upgrade to AWS SDK V2 (druid)
gianm commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-4026244923 > @gianm would you be able to take a look at this? Would like to get this merged as this branch is starting to accumulate conflicts I am hoping @kfaraz can finish out the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-4025903703 Have had this PR half-reviewed for more than a week now. My backlog of reviews is pretty much done now, I should be able to finally get back to this. I will go through the major files tomorrow and most likely approve it. Any other minor improvement can always be done later. cc: @jtuglu1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-4025290571 @gianm would you be able to take a look at this? Would like to get this merged as this branch is starting to accumulate conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-4006513481 @kfaraz bumping this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3995011818 Checking in on this – @kfaraz the failing tests pass locally for me. I was -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 closed pull request #18891: Upgrade to AWS SDK V2 URL: https://github.com/apache/druid/pull/18891 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3985727807 Yes, @jtuglu1 , I am almost done going through the files. A few remain, after which I will approve and merge this PR tomorrow. Everything looks good so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3985714284 @kfaraz any thoughts here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2850816765
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionRollingUpgradeDockerTest.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskBuilder;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
+import org.apache.druid.testing.DruidCommand;
+import org.apache.druid.testing.DruidContainer;
+import org.apache.druid.testing.MountedDir;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.apache.druid.testing.embedded.msq.MinIODurableStorageResource;
+import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that ingestion works correctly during a rolling upgrade, where the
+ * cluster has a mix of old-version and new-version indexers (the assumption
is that job logic is shared betwen indexers and MM/Peons, indexers are just
easier to use in tests).
+ * Validates that segments and intermediate shuffle data written to S3 by one
version can be
+ * read by the other.
+ *
+ * Use this test to verify rolling upgrade safety for any change that affects
+ * how data is written to or read from deep storage or MSQ durable shuffle
+ * storage (e.g. storage format changes, SDK upgrades, serialization changes).
+ *
+ * Cluster architecture
+ *
+ * Overlord, Coordinator, Broker, Historical, Indexer (new) -- embedded
(current version)
+ * Indexer (old) -- Docker container ({@link
DruidContainer.Image#APACHE_33})
+ * Deep Storage + MSQ Shuffle -- MinIO (S3-compatible)
+ * Metadata -- PostgreSQL (required for Docker container access)
+ *
+ *
+ * Task distribution guarantees
+ * The Overlord uses {@code EqualDistributionWorkerSelectStrategy} by default,
+ * which assigns each task to the worker with the most available capacity.
+ * Both indexers are configured with capacity 2 (4 total slots).
+ *
+ * Native batch ({@code index}): Two IndexTasks are submitted
+ * concurrently to separate datasources. The first task reduces one
+ * indexer's available capacity, so the second is assigned to the other
+ * indexer.
+ * Native batch ({@code index_parallel}): The supervisor task
+ * occupies one slot, then 3 subtasks (one per input file) are submitted.
+ * With 3 remaining slots across 2 indexers, subtasks are guaranteed to
+ * land on both.
+ * MSQ: With {@code maxNumTasks=3} (1 controller + 2 workers), the
+ * controller task fills one slot, so the two worker tasks are distributed
+ * across both indexers. This guarantees cross-version S3 shuffle I/O.
+ *
+ *
+ * This test does not implement {@link LatestImageDockerTest} and runs in the
+ * {@code mvn test} phase.
+ */
+public class IngestionRollingUpgradeDockerTest extends EmbeddedClusterTest
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2850818261
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionRollingUpgradeDockerTest.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskBuilder;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
+import org.apache.druid.testing.DruidCommand;
+import org.apache.druid.testing.DruidContainer;
+import org.apache.druid.testing.MountedDir;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.apache.druid.testing.embedded.msq.MinIODurableStorageResource;
+import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that ingestion works correctly during a rolling upgrade, where the
+ * cluster has a mix of old-version and new-version indexers (the assumption
is that job logic is shared betwen indexers and MM/Peons, indexers are just
easier to use in tests).
+ * Validates that segments and intermediate shuffle data written to S3 by one
version can be
+ * read by the other.
+ *
+ * Use this test to verify rolling upgrade safety for any change that affects
+ * how data is written to or read from deep storage or MSQ durable shuffle
+ * storage (e.g. storage format changes, SDK upgrades, serialization changes).
+ *
+ * Cluster architecture
+ *
+ * Overlord, Coordinator, Broker, Historical, Indexer (new) -- embedded
(current version)
+ * Indexer (old) -- Docker container ({@link
DruidContainer.Image#APACHE_33})
+ * Deep Storage + MSQ Shuffle -- MinIO (S3-compatible)
+ * Metadata -- PostgreSQL (required for Docker container access)
+ *
+ *
+ * Task distribution guarantees
+ * The Overlord uses {@code EqualDistributionWorkerSelectStrategy} by default,
+ * which assigns each task to the worker with the most available capacity.
+ * Both indexers are configured with capacity 2 (4 total slots).
+ *
+ * Native batch ({@code index}): Two IndexTasks are submitted
+ * concurrently to separate datasources. The first task reduces one
+ * indexer's available capacity, so the second is assigned to the other
+ * indexer.
+ * Native batch ({@code index_parallel}): The supervisor task
+ * occupies one slot, then 3 subtasks (one per input file) are submitted.
+ * With 3 remaining slots across 2 indexers, subtasks are guaranteed to
+ * land on both.
+ * MSQ: With {@code maxNumTasks=3} (1 controller + 2 workers), the
+ * controller task fills one slot, so the two worker tasks are distributed
+ * across both indexers. This guarantees cross-version S3 shuffle I/O.
+ *
+ *
+ * This test does not implement {@link LatestImageDockerTest} and runs in the
+ * {@code mvn test} phase.
+ */
+public class IngestionRollingUpgradeDockerTest extends EmbeddedClusterTest
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2850814098
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionRollingUpgradeDockerTest.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskBuilder;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
+import org.apache.druid.testing.DruidCommand;
+import org.apache.druid.testing.DruidContainer;
+import org.apache.druid.testing.MountedDir;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.apache.druid.testing.embedded.msq.MinIODurableStorageResource;
+import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that ingestion works correctly during a rolling upgrade, where the
+ * cluster has a mix of old-version and new-version indexers (the assumption
is that job logic is shared betwen indexers and MM/Peons, indexers are just
easier to use in tests).
+ * Validates that segments and intermediate shuffle data written to S3 by one
version can be
+ * read by the other.
+ *
+ * Use this test to verify rolling upgrade safety for any change that affects
+ * how data is written to or read from deep storage or MSQ durable shuffle
+ * storage (e.g. storage format changes, SDK upgrades, serialization changes).
+ *
+ * Cluster architecture
+ *
+ * Overlord, Coordinator, Broker, Historical, Indexer (new) -- embedded
(current version)
+ * Indexer (old) -- Docker container ({@link
DruidContainer.Image#APACHE_33})
+ * Deep Storage + MSQ Shuffle -- MinIO (S3-compatible)
+ * Metadata -- PostgreSQL (required for Docker container access)
+ *
+ *
+ * Task distribution guarantees
+ * The Overlord uses {@code EqualDistributionWorkerSelectStrategy} by default,
+ * which assigns each task to the worker with the most available capacity.
+ * Both indexers are configured with capacity 2 (4 total slots).
+ *
+ * Native batch ({@code index}): Two IndexTasks are submitted
+ * concurrently to separate datasources. The first task reduces one
+ * indexer's available capacity, so the second is assigned to the other
+ * indexer.
+ * Native batch ({@code index_parallel}): The supervisor task
+ * occupies one slot, then 3 subtasks (one per input file) are submitted.
+ * With 3 remaining slots across 2 indexers, subtasks are guaranteed to
+ * land on both.
+ * MSQ: With {@code maxNumTasks=3} (1 controller + 2 workers), the
+ * controller task fills one slot, so the two worker tasks are distributed
+ * across both indexers. This guarantees cross-version S3 shuffle I/O.
+ *
+ *
+ * This test does not implement {@link LatestImageDockerTest} and runs in the
+ * {@code mvn test} phase.
+ */
+public class IngestionRollingUpgradeDockerTest extends EmbeddedClusterTest
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2850812350
##
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionRollingUpgradeDockerTest.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskBuilder;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
+import org.apache.druid.testing.DruidCommand;
+import org.apache.druid.testing.DruidContainer;
+import org.apache.druid.testing.MountedDir;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.apache.druid.testing.embedded.msq.MinIODurableStorageResource;
+import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that ingestion works correctly during a rolling upgrade, where the
+ * cluster has a mix of old-version and new-version indexers (the assumption
is that job logic is shared betwen indexers and MM/Peons, indexers are just
easier to use in tests).
+ * Validates that segments and intermediate shuffle data written to S3 by one
version can be
+ * read by the other.
+ *
+ * Use this test to verify rolling upgrade safety for any change that affects
+ * how data is written to or read from deep storage or MSQ durable shuffle
+ * storage (e.g. storage format changes, SDK upgrades, serialization changes).
+ *
+ * Cluster architecture
+ *
+ * Overlord, Coordinator, Broker, Historical, Indexer (new) -- embedded
(current version)
+ * Indexer (old) -- Docker container ({@link
DruidContainer.Image#APACHE_33})
+ * Deep Storage + MSQ Shuffle -- MinIO (S3-compatible)
+ * Metadata -- PostgreSQL (required for Docker container access)
+ *
+ *
+ * Task distribution guarantees
+ * The Overlord uses {@code EqualDistributionWorkerSelectStrategy} by default,
+ * which assigns each task to the worker with the most available capacity.
+ * Both indexers are configured with capacity 2 (4 total slots).
+ *
+ * Native batch ({@code index}): Two IndexTasks are submitted
+ * concurrently to separate datasources. The first task reduces one
+ * indexer's available capacity, so the second is assigned to the other
+ * indexer.
+ * Native batch ({@code index_parallel}): The supervisor task
+ * occupies one slot, then 3 subtasks (one per input file) are submitted.
+ * With 3 remaining slots across 2 indexers, subtasks are guaranteed to
+ * land on both.
+ * MSQ: With {@code maxNumTasks=3} (1 controller + 2 workers), the
+ * controller task fills one slot, so the two worker tasks are distributed
+ * across both indexers. This guarantees cross-version S3 shuffle I/O.
+ *
+ *
+ * This test does not implement {@link LatestImageDockerTest} and runs in the
+ * {@code mvn test} phase.
+ */
+public class IngestionRollingUpgradeDockerTest extends EmbeddedClusterTest
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891: URL: https://github.com/apache/druid/pull/18891#discussion_r2850808373 ## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionRollingUpgradeDockerTest.java: ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.indexing.common.task.TaskBuilder; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.storage.s3.output.S3StorageConnectorModule; +import org.apache.druid.testing.DruidCommand; +import org.apache.druid.testing.DruidContainer; +import org.apache.druid.testing.MountedDir; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.indexing.MoreResources; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.apache.druid.testing.embedded.msq.MinIODurableStorageResource; +import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests that ingestion works correctly during a rolling upgrade, where the + * cluster has a mix of old-version and new-version indexers (the assumption is that job logic is shared betwen indexers and MM/Peons, indexers are just easier to use in tests). + * Validates that segments and intermediate shuffle data written to S3 by one version can be Review Comment: Please remove the rest of this javadoc as all of that information is already captured in the tests themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
kfaraz commented on code in PR #18891: URL: https://github.com/apache/druid/pull/18891#discussion_r2850806169 ## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionRollingUpgradeDockerTest.java: ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.indexing.common.task.TaskBuilder; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.storage.s3.output.S3StorageConnectorModule; +import org.apache.druid.testing.DruidCommand; +import org.apache.druid.testing.DruidContainer; +import org.apache.druid.testing.MountedDir; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.indexing.MoreResources; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.apache.druid.testing.embedded.msq.MinIODurableStorageResource; +import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests that ingestion works correctly during a rolling upgrade, where the + * cluster has a mix of old-version and new-version indexers (the assumption is that job logic is shared betwen indexers and MM/Peons, indexers are just easier to use in tests). Review Comment: ```suggestion * Tests that ingestion works correctly during a rolling upgrade, by using a * cluster which has a Historical and an Indexer on old version and * another Indexer and the other services on the latest version. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3931916619 > ie Ingestion running on newer aws sdk and historicals running with older sdk are able to pull segments created with the new s3 client Yes – I have tested both v2-written segments being read by v1 and vice-versa. > MSQ tasks do share shuffle files via s3. So what happens if task (on old aws sdk) tries to write data and task (on new sdk) tries to read data. I have not tested this yet – we currently deploy in a R/B fashion where there are never old version/new version tasks running at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
cryptoe commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3931825718 Thanks @jtuglu1 for your comments. While thinking through this more, I was also thinking if we have covered rolling upgrade tests. * ie Ingestion running on newer aws sdk and historicals running with older sdk are able to pull segments created with the new s3 client * MSQ tasks do share shuffle files via s3. So what happens if task a (on old aws sdk)tries to write data and task (on new sdk) tries to read data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2792531268
##
cloud/aws-common/src/main/java/org/apache/druid/common/aws/FileSessionCredentialsProvider.java:
##
@@ -53,13 +53,12 @@ public FileSessionCredentialsProvider(String
sessionCredentialsFile)
}
@Override
- public AWSCredentials getCredentials()
+ public AwsCredentials resolveCredentials()
{
return awsSessionCredentials;
}
- @Override
- public void refresh()
+ private void refresh()
Review Comment:
This refresh() is done now solely through a preexisting scheduled on the
class. This is equivalent behavior as before, where
`FileSessionCredentialsProvider` handles its own refresh internally on its
scheduler. I don't believe the standalone method was used explicitly either;
the v1 `AWSCredentialsProviderChain` didn't proactively call refresh() on
providers during getCredentials() resolution — it just iterated through
providers catching exceptions (from what I can tell).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2792457074
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryWithBucketIterator.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import org.apache.druid.java.util.common.RE;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator that returns S3 objects along with their bucket names.
+ * This is needed because AWS SDK v2's S3Object doesn't include the bucket
name.
+ */
+public class ObjectSummaryWithBucketIterator implements
Iterator
+{
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final Iterator prefixesIterator;
+ private final int maxListingLength;
+ private final int maxRetries;
+
+ private String currentBucket;
+ private String currentPrefix;
+ private String continuationToken;
+ private ListObjectsV2Response result;
+ private Iterator objectSummaryIterator;
+ private S3Utils.S3ObjectWithBucket currentObjectSummary;
+
+ ObjectSummaryWithBucketIterator(
+ final ServerSideEncryptingAmazonS3 s3Client,
+ final Iterable prefixes,
+ final int maxListingLength,
+ final int maxRetries
+ )
+ {
+this.s3Client = s3Client;
+this.prefixesIterator = prefixes.iterator();
+this.maxListingLength = maxListingLength;
+this.maxRetries = maxRetries;
+
+prepareNextRequest();
Review Comment:
I've moved out of ctor and into an initialize method. Iterators are expected
to be single-threaded access, I'll make it thread-safe just in case.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryWithBucketIterator.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import org.apache.druid.java.util.common.RE;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator that returns S3 objects along with their bucket names.
+ * This is needed because AWS SDK v2's S3Object doesn't include the bucket
name.
+ */
+public class ObjectSummaryWithBucketIterator implements
Iterator
+{
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final Iterator prefixesIterator;
+ private final int maxListingLength;
+ private final int maxRetries;
+
+ private String currentBucket;
+ private String currentPrefix;
+ private String continuationToken;
+ private ListObjectsV2Response result;
+ private Iterator objectSummaryIterator;
+ private S3Utils.S3ObjectWithBucket currentObjectSummary;
+
+ ObjectSummaryWithBucketIterator(
+ final ServerSideEncryptingAmazonS3 s3Client,
+ final Iterable prefixes,
+ final int maxListingLength,
+ final int m
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2792439801
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryWithBucketIterator.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import org.apache.druid.java.util.common.RE;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator that returns S3 objects along with their bucket names.
+ * This is needed because AWS SDK v2's S3Object doesn't include the bucket
name.
+ */
+public class ObjectSummaryWithBucketIterator implements
Iterator
+{
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final Iterator prefixesIterator;
+ private final int maxListingLength;
+ private final int maxRetries;
+
+ private String currentBucket;
+ private String currentPrefix;
+ private String continuationToken;
+ private ListObjectsV2Response result;
+ private Iterator objectSummaryIterator;
+ private S3Utils.S3ObjectWithBucket currentObjectSummary;
+
+ ObjectSummaryWithBucketIterator(
+ final ServerSideEncryptingAmazonS3 s3Client,
+ final Iterable prefixes,
+ final int maxListingLength,
+ final int maxRetries
+ )
+ {
+this.s3Client = s3Client;
+this.prefixesIterator = prefixes.iterator();
+this.maxListingLength = maxListingLength;
+this.maxRetries = maxRetries;
+
+prepareNextRequest();
+fetchNextBatch();
+advanceObjectSummary();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+return currentObjectSummary != null;
+ }
+
+ @Override
+ public S3Utils.S3ObjectWithBucket next()
+ {
+if (currentObjectSummary == null) {
+ throw new NoSuchElementException();
+}
+
+final S3Utils.S3ObjectWithBucket retVal = currentObjectSummary;
+advanceObjectSummary();
+return retVal;
+ }
+
+ private void prepareNextRequest()
+ {
+final URI currentUri = prefixesIterator.next();
+currentBucket = currentUri.getAuthority();
+currentPrefix = S3Utils.extractS3Key(currentUri);
+continuationToken = null;
+ }
+
+ private void fetchNextBatch()
+ {
+try {
+ ListObjectsV2Request request = ListObjectsV2Request.builder()
+ .bucket(currentBucket)
+ .prefix(currentPrefix)
+ .maxKeys(maxListingLength)
+ .continuationToken(continuationToken)
+ .build();
+
+ result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request),
maxRetries);
+ continuationToken = result.nextContinuationToken();
+ objectSummaryIterator = result.contents().iterator();
+}
+catch (S3Exception e) {
+ throw new RE(
+ e,
+ "Failed to get object summaries from S3 bucket[%s], prefix[%s]; S3
error: %s",
+ currentBucket,
+ currentPrefix,
+ e.getMessage()
+ );
+}
+catch (Exception e) {
+ throw new RE(
+ e,
+ "Failed to get object summaries from S3 bucket[%s], prefix[%s]",
+ currentBucket,
+ currentPrefix
+ );
+}
+ }
+
+ private void advanceObjectSummary()
+ {
+while (objectSummaryIterator.hasNext() || result.isTruncated() ||
prefixesIterator.hasNext()) {
+ while (objectSummaryIterator.hasNext()) {
+S3Object s3Object = objectSummaryIterator.next();
+// skips directories and empty objects
+if (!isDirectoryPlaceholder(s3Object) && s3Object.size() > 0) {
+ currentObjectSummary = new S3Utils.S3ObjectWithBucket(currentBucket,
s3Object);
+ return;
+}
+ }
+
+ // Exhausted "objectSummaryIterator" without finding a non-placeholder.
+ if (result.isTruncated()) {
+fetchNextBatch();
+ } else if (prefixesIterator.hasNext()) {
+prepareNextRequest();
+fetchNextBatch();
+ }
+}
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3882937296 Hi, update here: I've left this internally on a few of our clusters and things look good. Confirmed I've tested a superset of the following: | # | Test | Status | |---|--|| | 1 | Native Batch Ingestion (index_parallel) | DONE | | 2 | Streaming Ingestion (index_kafka) | DONE | | 3 | Streaming Ingestion (index_kinesis) | TODO | | 4 | MSQ SELECT with S3 Durable Storage | DONE | | 5 | MSQ INSERT with S3 Shuffle Storage | DONE | | 6 | Segment Mark as Unused + Kill Task + Nuke Segments | DONE | | 7 | Task Log Deletion from S3 | DONE | I will see if I can get the kinesis tested, but otherwise things are looking good. I will address rest of comments and then should be ready to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
cryptoe commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3772022458 > Hi @cryptoe. Yes, I'm testing this on our production clusters running Kafka + S3. We don't have production clusters using Kinesis. I've only been able to run tests locally. Thanks for this. Do keep us posted about the findings. > I wonder if you folks might have some Kinesis clusters to test the new logic out on? We also don't have active kinesis clusters but let me get back to you on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3750781545 Hi @cryptoe. Yes, I'm testing this on our production clusters running Kafka + S3. We don't have production clusters using Kinesis. I've only been able to run tests locally. I wonder if you folks might have some Kinesis clusters to test the new logic out on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
cryptoe commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2690549813
##
cloud/aws-common/src/main/java/org/apache/druid/common/aws/FileSessionCredentialsProvider.java:
##
@@ -53,13 +53,12 @@ public FileSessionCredentialsProvider(String
sessionCredentialsFile)
}
@Override
- public AWSCredentials getCredentials()
+ public AwsCredentials resolveCredentials()
{
return awsSessionCredentials;
}
- @Override
- public void refresh()
+ private void refresh()
Review Comment:
Who calls the refresh method now ?
##
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##
@@ -474,45 +459,88 @@ public KinesisRecordSupplier(
records = new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
}
- public static AmazonKinesis getAmazonKinesisClient(
+ public static KinesisClient getAmazonKinesisClient(
String endpoint,
AWSCredentialsConfig awsCredentialsConfig,
String awsAssumedRoleArn,
String awsExternalId
)
{
-AWSCredentialsProvider awsCredentialsProvider =
AWSCredentialsUtils.defaultAWSCredentialsProviderChain(
+AwsCredentialsProvider credentialsProvider =
AWSCredentialsUtils.defaultAWSCredentialsProviderChain(
awsCredentialsConfig
);
+final Region regionFromEndpoint = parseRegionFromEndpoint(endpoint);
+
if (awsAssumedRoleArn != null) {
log.info("Assuming role [%s] with externalId [%s]", awsAssumedRoleArn,
awsExternalId);
- STSAssumeRoleSessionCredentialsProvider.Builder builder = new
STSAssumeRoleSessionCredentialsProvider
- .Builder(awsAssumedRoleArn, StringUtils.format("druid-kinesis-%s",
UUID.randomUUID().toString()))
- .withStsClient(AWSSecurityTokenServiceClientBuilder.standard()
-
.withCredentials(awsCredentialsProvider)
- .build());
+ AssumeRoleRequest.Builder assumeRoleBuilder = AssumeRoleRequest.builder()
+ .roleArn(awsAssumedRoleArn)
+ .roleSessionName(StringUtils.format("druid-kinesis-%s",
UUID.randomUUID().toString()));
if (awsExternalId != null) {
-builder.withExternalId(awsExternalId);
+assumeRoleBuilder.externalId(awsExternalId);
+ }
+
+ StsClientBuilder stsClientBuilder = StsClient.builder()
+ .credentialsProvider(credentialsProvider);
+
+ if (regionFromEndpoint != null) {
+stsClientBuilder.region(regionFromEndpoint);
}
- awsCredentialsProvider = builder.build();
+ StsClient stsClient = stsClientBuilder.build();
+
+ credentialsProvider = StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(assumeRoleBuilder.build())
+ .build();
}
-return AmazonKinesisClientBuilder.standard()
- .withCredentials(awsCredentialsProvider)
- .withClientConfiguration(new
ClientConfiguration())
- .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
- endpoint,
- AwsHostNameUtils.parseRegion(
- endpoint,
- null
- )
- )).build();
+KinesisClientBuilder builder = KinesisClient.builder()
+.credentialsProvider(credentialsProvider);
+
+if (endpoint != null && !endpoint.isEmpty()) {
+ // Back-compat: historically this endpoint is often a hostname without a
scheme
+ // (e.g. "kinesis.us-east-1.amazonaws.com"). SDK v2 requires a URI for
endpointOverride.
+ final String endpointWithScheme = endpoint.contains("://") ? endpoint :
"https://"; + endpoint;
+ URI endpointUri = URI.create(endpointWithScheme);
+ builder.endpointOverride(endpointUri);
+}
+
+// SDK v2 requires a region; when endpoint matches AWS hostname pattern we
can infer it.
+if (regionFromEndpoint != null) {
+ builder.region(regionFromEndpoint);
+}
+
+return builder.build();
}
+ /**
+ * Parse region from a Kinesis endpoint URL.
+ * Expected format: https://kinesis.{region}.amazonaws.com
+ */
+ private static Region parseRegionFromEndpoint(String endpoint)
Review Comment:
We should probably add tests for this function.
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryWithBucketIterator.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional inform
Re: [PR] Upgrade to AWS SDK V2 (druid)
cryptoe commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3749687108 Due to the criticality of the change, I think we should get 2 approvals for this PR. cc @jtuglu1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
jtuglu1 commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3747583395 👍 I'd still like to add 1-2 ITs that fully cover the newer S3 changes, but overall think it's basically there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
Fly-Style commented on PR #18891: URL: https://github.com/apache/druid/pull/18891#issuecomment-3744839150 Very good job @jtuglu1! I have no objections, PR looks good from my perspective! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
Fly-Style commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2686676081
##
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3ServerSideEncryption.java:
##
@@ -19,29 +19,20 @@
package org.apache.druid.storage.s3;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
class S3ServerSideEncryption implements ServerSideEncryption
{
@Override
- public PutObjectRequest decorate(PutObjectRequest request)
+ public PutObjectRequest.Builder decorate(PutObjectRequest.Builder builder)
{
-final ObjectMetadata objectMetadata = request.getMetadata() == null ?
- new ObjectMetadata() :
- request.getMetadata().clone();
-
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
-return request.withMetadata(objectMetadata);
+return
builder.serverSideEncryption(software.amazon.awssdk.services.s3.model.ServerSideEncryption.AES256);
}
@Override
- public CopyObjectRequest decorate(CopyObjectRequest request)
+ public CopyObjectRequest.Builder decorate(CopyObjectRequest.Builder builder)
{
-final ObjectMetadata objectMetadata = request.getNewObjectMetadata() ==
null ?
- new ObjectMetadata() :
-
request.getNewObjectMetadata().clone();
-
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
-return request.withNewObjectMetadata(objectMetadata);
+return
builder.serverSideEncryption(software.amazon.awssdk.services.s3.model.ServerSideEncryption.AES256);
Review Comment:
Nit: do we need this prefix?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
Fly-Style commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2685840400
##
cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientUtilTest.java:
##
@@ -33,75 +31,79 @@ public class AWSClientUtilTest
@Test
public void testRecoverableException_IOException()
{
-Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(new
AmazonClientException(new IOException(;
+
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(SdkClientException.builder().cause(new
IOException()).build()));
}
@Test
public void testRecoverableException_RequestTimeout()
{
-AmazonServiceException ex = new AmazonServiceException(null);
-ex.setErrorCode("RequestTimeout");
+AwsServiceException ex = AwsServiceException.builder()
+.message("RequestTimeout")
+
.awsErrorDetails(software.amazon.awssdk.awscore.exception.AwsErrorDetails.builder()
Review Comment:
Nit: is that prefix really necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Upgrade to AWS SDK V2 (druid)
Fly-Style commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2685815394
##
cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSCredentialsUtils.java:
##
@@ -19,26 +19,28 @@
package org.apache.druid.common.aws;
-import com.amazonaws.auth.AWSCredentialsProviderChain;
-import com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
-import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
+import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
public class AWSCredentialsUtils
{
- public static AWSCredentialsProviderChain
defaultAWSCredentialsProviderChain(final AWSCredentialsConfig config)
+ public static AwsCredentialsProvider
defaultAWSCredentialsProviderChain(final AWSCredentialsConfig config)
Review Comment:
Nit: there is also a `AwsCredentialsProviderChain.of(...)` method, it might
look cleaner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
