Xiao-zhen-Liu commented on code in PR #5569:
URL: https://github.com/apache/texera/pull/5569#discussion_r3392496414


##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
     // Ensure the directory prefix ends with `/` to avoid accidental deletions
     val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else 
directoryPrefix + "/"
 
-    // List objects under the given prefix
-    val listRequest = ListObjectsV2Request
-      .builder()
-      .bucket(bucketName)
-      .prefix(prefix)
-      .build()
-
-    val listResponse = s3Client.listObjectsV2(listRequest)
-
-    // Extract object keys
-    val objectKeys = listResponse.contents().asScala.map(_.key())
-
-    if (objectKeys.nonEmpty) {
-      val objectsToDelete =
-        objectKeys.map(key => 
ObjectIdentifier.builder().key(key).build()).asJava
-
-      val deleteRequest = Delete
-        .builder()
-        .objects(objectsToDelete)
-        .build()
-
-      // Compute MD5 checksum for MinIO if required
-      val md5Hash = MessageDigest
-        .getInstance("MD5")
-        .digest(deleteRequest.toString.getBytes("UTF-8"))
-
-      // Convert object keys to S3 DeleteObjectsRequest format
-      val deleteObjectsRequest = DeleteObjectsRequest
-        .builder()
-        .bucket(bucketName)
-        .delete(deleteRequest)
-        .build()
+    val listRequest = 
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()
+
+    // Paginate across all pages, then delete in batches within the 
per-request key limit.
+    s3Client
+      .listObjectsV2Paginator(listRequest)
+      .contents()
+      .asScala
+      .iterator
+      .map(obj => ObjectIdentifier.builder().key(obj.key()).build())
+      .grouped(MAX_KEYS_PER_DELETE_REQUEST)
+      .foreach { batch =>
+        val response = s3Client.deleteObjects(
+          DeleteObjectsRequest
+            .builder()
+            .bucket(bucketName)
+            .delete(Delete.builder().objects(batch.asJava).build())
+            .build()
+        )
+        throwOnDeleteErrors(prefix, response)

Review Comment:
   Throwing on the first failed batch stops the loop, so every batch after it 
is never attempted — one undeletable key early in the listing leaves everything 
behind it leaked. Since per-key failures don't affect other keys, an 
alternative is to keep going through all batches, collect the failures, and 
throw once at the end; a partial failure then leaves behind only the keys that 
genuinely couldn't be deleted.
   
   Fail-fast is defensible since the operation is retryable — but note the 
`DatasetResource` caller can't really retry: by the time this runs, the LakeFS 
repo is already deleted, so a second attempt fails at `deleteRepo`. Maximizing 
progress per attempt seems worth it there. Non-blocking.



##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
     // Ensure the directory prefix ends with `/` to avoid accidental deletions
     val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else 
directoryPrefix + "/"

Review Comment:
   Not from this PR, but worth a follow-up: an empty `directoryPrefix` 
normalizes to `"/"`, which matches no keys, so `deleteDirectory(bucket, "")` 
quietly deletes nothing. The spec's `afterAll` calls exactly that, expecting a 
full-bucket cleanup that never happens. Either reject empty prefixes or treat 
them as "the whole bucket".



##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
     // Ensure the directory prefix ends with `/` to avoid accidental deletions
     val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else 
directoryPrefix + "/"
 
-    // List objects under the given prefix
-    val listRequest = ListObjectsV2Request
-      .builder()
-      .bucket(bucketName)
-      .prefix(prefix)
-      .build()
-
-    val listResponse = s3Client.listObjectsV2(listRequest)
-
-    // Extract object keys
-    val objectKeys = listResponse.contents().asScala.map(_.key())
-
-    if (objectKeys.nonEmpty) {
-      val objectsToDelete =
-        objectKeys.map(key => 
ObjectIdentifier.builder().key(key).build()).asJava
-
-      val deleteRequest = Delete
-        .builder()
-        .objects(objectsToDelete)
-        .build()
-
-      // Compute MD5 checksum for MinIO if required
-      val md5Hash = MessageDigest
-        .getInstance("MD5")
-        .digest(deleteRequest.toString.getBytes("UTF-8"))
-
-      // Convert object keys to S3 DeleteObjectsRequest format
-      val deleteObjectsRequest = DeleteObjectsRequest
-        .builder()
-        .bucket(bucketName)
-        .delete(deleteRequest)
-        .build()
+    val listRequest = 
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()
+
+    // Paginate across all pages, then delete in batches within the 
per-request key limit.
+    s3Client
+      .listObjectsV2Paginator(listRequest)
+      .contents()
+      .asScala
+      .iterator
+      .map(obj => ObjectIdentifier.builder().key(obj.key()).build())
+      .grouped(MAX_KEYS_PER_DELETE_REQUEST)
+      .foreach { batch =>
+        val response = s3Client.deleteObjects(
+          DeleteObjectsRequest
+            .builder()
+            .bucket(bucketName)
+            .delete(Delete.builder().objects(batch.asJava).build())

Review Comment:
   Minor: consider `.quiet(true)` on the `DeleteObjects` request. By default it 
echoes every successfully deleted key back in the response, so each 1000-key 
batch returns a large body we never read. Quiet mode returns only the failures 
— exactly what `throwOnDeleteErrors` looks at.



##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala:
##########
@@ -106,41 +109,44 @@ object S3StorageClient {
     // Ensure the directory prefix ends with `/` to avoid accidental deletions

Review Comment:
   "prefix" is used throughout this method and `throwOnDeleteErrors` but never 
explained, and it's S3 jargon, not a general term. For someone new to this 
code: S3 keys are a flat namespace with no real directories — a "directory" is 
just a shared key prefix, and deleting it means deleting every object whose key 
begins with that string. A sentence to that effect in the doc would anchor all 
the later uses of "prefix", and would explain why the trailing `/` matters (so 
prefix `a/b` matches `a/b/file` but not the unrelated `a/bc/file`) — which is 
what the "avoid accidental deletions" comment is really about. Also worth 
dropping "must end with `/`" from the `@param`, since the method normalizes it 
for you now.



##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala:
##########
@@ -334,4 +338,91 @@ class S3StorageClientSpec
 
     S3StorageClient.deleteObject(testBucketName, objectKey)
   }
+
+  // ========================================
+  // deleteDirectory Tests
+  // ========================================
+
+  test("deleteDirectory should delete all objects under a prefix") {
+    val prefix = "delete-dir/small"
+    val keys = (0 until 5).map(i => s"$prefix/object-$i.txt")
+    keys.foreach(key =>
+      S3StorageClient.uploadObject(testBucketName, key, 
createInputStream("data"))
+    )
+
+    assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+    S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+    assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+  }
+
+  test("deleteDirectory should delete more than 1000 objects under a prefix") {
+    // >1000 objects exercises pagination and delete batching; without them 
the tail is orphaned.
+    val prefix = "delete-dir/large"
+    val objectCount = 1001
+
+    // Upload concurrently to keep the test reasonably fast.
+    val pool = Executors.newFixedThreadPool(16)
+    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool)
+    try {
+      val uploads = (0 until objectCount).map { i =>
+        Future {
+          S3StorageClient.uploadObject(
+            testBucketName,
+            f"$prefix/object-$i%05d.txt",
+            createInputStream("")
+          )
+        }
+      }
+      Await.result(Future.sequence(uploads), 5.minutes)
+    } finally {
+      pool.shutdown()
+    }
+
+    assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+    S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+    assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+  }
+
+  test("deleteDirectory should not throw for a prefix with no objects") {
+    // Empty listing: no DeleteObjects request is issued.
+    S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent")
+  }
+
+  test("deleteDirectory should surface per-key delete failures rather than 
swallow them") {

Review Comment:
   Two small things: the test name says `deleteDirectory` but it exercises 
`throwOnDeleteErrors` directly — naming it after the helper would be more 
accurate. And it packs three scenarios (single failure, no errors, capped list) 
into one test; splitting them would make a failure point straight at the broken 
scenario.



##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala:
##########
@@ -334,4 +338,91 @@ class S3StorageClientSpec
 
     S3StorageClient.deleteObject(testBucketName, objectKey)
   }
+
+  // ========================================
+  // deleteDirectory Tests
+  // ========================================
+
+  test("deleteDirectory should delete all objects under a prefix") {

Review Comment:
   Cheap coverage win: the trailing-slash guard ("avoid accidental deletions") 
isn't pinned by any test. Upload a sibling object here, e.g. 
`delete-dir/small-sibling.txt`, then assert it still exists after deleting 
`delete-dir/small`. That locks in the behavior the comment promises.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to