Xiao-zhen-Liu commented on code in PR #5569:
URL: https://github.com/apache/texera/pull/5569#discussion_r3392496414
##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
// Ensure the directory prefix ends with `/` to avoid accidental deletions
val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else
directoryPrefix + "/"
- // List objects under the given prefix
- val listRequest = ListObjectsV2Request
- .builder()
- .bucket(bucketName)
- .prefix(prefix)
- .build()
-
- val listResponse = s3Client.listObjectsV2(listRequest)
-
- // Extract object keys
- val objectKeys = listResponse.contents().asScala.map(_.key())
-
- if (objectKeys.nonEmpty) {
- val objectsToDelete =
- objectKeys.map(key =>
ObjectIdentifier.builder().key(key).build()).asJava
-
- val deleteRequest = Delete
- .builder()
- .objects(objectsToDelete)
- .build()
-
- // Compute MD5 checksum for MinIO if required
- val md5Hash = MessageDigest
- .getInstance("MD5")
- .digest(deleteRequest.toString.getBytes("UTF-8"))
-
- // Convert object keys to S3 DeleteObjectsRequest format
- val deleteObjectsRequest = DeleteObjectsRequest
- .builder()
- .bucket(bucketName)
- .delete(deleteRequest)
- .build()
+ val listRequest =
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()
+
+ // Paginate across all pages, then delete in batches within the
per-request key limit.
+ s3Client
+ .listObjectsV2Paginator(listRequest)
+ .contents()
+ .asScala
+ .iterator
+ .map(obj => ObjectIdentifier.builder().key(obj.key()).build())
+ .grouped(MAX_KEYS_PER_DELETE_REQUEST)
+ .foreach { batch =>
+ val response = s3Client.deleteObjects(
+ DeleteObjectsRequest
+ .builder()
+ .bucket(bucketName)
+ .delete(Delete.builder().objects(batch.asJava).build())
+ .build()
+ )
+ throwOnDeleteErrors(prefix, response)
Review Comment:
Throwing on the first failed batch stops the loop, so every batch after it
is never attempted — one undeletable key early in the listing leaves everything
behind it leaked. Since per-key failures don't affect other keys, an
alternative is to keep going through all batches, collect the failures, and
throw once at the end; a partial failure then leaves behind only the keys that
genuinely couldn't be deleted.
Fail-fast is defensible since the operation is retryable — but note the
`DatasetResource` caller can't really retry: by the time this runs, the LakeFS
repo is already deleted, so a second attempt fails at `deleteRepo`. Maximizing
progress per attempt seems worth it there. Non-blocking.
##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
// Ensure the directory prefix ends with `/` to avoid accidental deletions
val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else
directoryPrefix + "/"
Review Comment:
Not from this PR, but worth a follow-up: an empty `directoryPrefix`
normalizes to `"/"`, which matches no keys, so `deleteDirectory(bucket, "")`
quietly deletes nothing. The spec's `afterAll` calls exactly that, expecting a
full-bucket cleanup that never happens. Either reject empty prefixes or treat
them as "the whole bucket".
##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
// Ensure the directory prefix ends with `/` to avoid accidental deletions
val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else
directoryPrefix + "/"
- // List objects under the given prefix
- val listRequest = ListObjectsV2Request
- .builder()
- .bucket(bucketName)
- .prefix(prefix)
- .build()
-
- val listResponse = s3Client.listObjectsV2(listRequest)
-
- // Extract object keys
- val objectKeys = listResponse.contents().asScala.map(_.key())
-
- if (objectKeys.nonEmpty) {
- val objectsToDelete =
- objectKeys.map(key =>
ObjectIdentifier.builder().key(key).build()).asJava
-
- val deleteRequest = Delete
- .builder()
- .objects(objectsToDelete)
- .build()
-
- // Compute MD5 checksum for MinIO if required
- val md5Hash = MessageDigest
- .getInstance("MD5")
- .digest(deleteRequest.toString.getBytes("UTF-8"))
-
- // Convert object keys to S3 DeleteObjectsRequest format
- val deleteObjectsRequest = DeleteObjectsRequest
- .builder()
- .bucket(bucketName)
- .delete(deleteRequest)
- .build()
+ val listRequest =
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()
+
+ // Paginate across all pages, then delete in batches within the
per-request key limit.
+ s3Client
+ .listObjectsV2Paginator(listRequest)
+ .contents()
+ .asScala
+ .iterator
+ .map(obj => ObjectIdentifier.builder().key(obj.key()).build())
+ .grouped(MAX_KEYS_PER_DELETE_REQUEST)
+ .foreach { batch =>
+ val response = s3Client.deleteObjects(
+ DeleteObjectsRequest
+ .builder()
+ .bucket(bucketName)
+ .delete(Delete.builder().objects(batch.asJava).build())
Review Comment:
Minor: consider `.quiet(true)` on the `DeleteObjects` request. By default it
echoes every successfully deleted key back in the response, so each 1000-key
batch returns a large body we never read. Quiet mode returns only the failures
— exactly what `throwOnDeleteErrors` looks at.
##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
// Ensure the directory prefix ends with `/` to avoid accidental deletions
Review Comment:
"prefix" is used throughout this method and `throwOnDeleteErrors` but never
explained, and it's S3 jargon, not a general term. For someone new to this
code: S3 keys are a flat namespace with no real directories — a "directory" is
just a shared key prefix, and deleting it means deleting every object whose key
begins with that string. A sentence to that effect in the doc would anchor all
the later uses of "prefix", and would explain why the trailing `/` matters (so
prefix `a/b` matches `a/b/file` but not the unrelated `a/bc/file`) — which is
what the "avoid accidental deletions" comment is really about. Also worth
dropping "must end with `/`" from the `@param`, since the method normalizes it
for you now.
##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala:
##########
@@ -334,4 +338,91 @@ class S3StorageClientSpec
S3StorageClient.deleteObject(testBucketName, objectKey)
}
+
+ // ========================================
+ // deleteDirectory Tests
+ // ========================================
+
+ test("deleteDirectory should delete all objects under a prefix") {
+ val prefix = "delete-dir/small"
+ val keys = (0 until 5).map(i => s"$prefix/object-$i.txt")
+ keys.foreach(key =>
+ S3StorageClient.uploadObject(testBucketName, key,
createInputStream("data"))
+ )
+
+ assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+ S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+ assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+ }
+
+ test("deleteDirectory should delete more than 1000 objects under a prefix") {
+ // >1000 objects exercises pagination and delete batching; without them
the tail is orphaned.
+ val prefix = "delete-dir/large"
+ val objectCount = 1001
+
+ // Upload concurrently to keep the test reasonably fast.
+ val pool = Executors.newFixedThreadPool(16)
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool)
+ try {
+ val uploads = (0 until objectCount).map { i =>
+ Future {
+ S3StorageClient.uploadObject(
+ testBucketName,
+ f"$prefix/object-$i%05d.txt",
+ createInputStream("")
+ )
+ }
+ }
+ Await.result(Future.sequence(uploads), 5.minutes)
+ } finally {
+ pool.shutdown()
+ }
+
+ assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+ S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+ assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+ }
+
+ test("deleteDirectory should not throw for a prefix with no objects") {
+ // Empty listing: no DeleteObjects request is issued.
+ S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent")
+ }
+
+ test("deleteDirectory should surface per-key delete failures rather than
swallow them") {
Review Comment:
Two small things: the test name says `deleteDirectory` but it exercises
`throwOnDeleteErrors` directly — naming it after the helper would be more
accurate. And it packs three scenarios (single failure, no errors, capped list)
into one test; splitting them would make a failure point straight at the broken
scenario.
##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala:
##########
@@ -334,4 +338,91 @@ class S3StorageClientSpec
S3StorageClient.deleteObject(testBucketName, objectKey)
}
+
+ // ========================================
+ // deleteDirectory Tests
+ // ========================================
+
+ test("deleteDirectory should delete all objects under a prefix") {
Review Comment:
Cheap coverage win: the trailing-slash guard ("avoid accidental deletions")
isn't pinned by any test. Upload a sibling object here, e.g.
`delete-dir/small-sibling.txt`, then assert it still exists after deleting
`delete-dir/small`. That locks in the behavior the comment promises.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]