kunwp1 commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3366010650


##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging {
   val DEFAULT_BUCKET: String = "texera-large-binaries"
 
   /**
-    * Creates a new LargeBinary reference.
+    * Worker-scoped execution context. It is set on the data-processing thread 
when an
+    * executor is initialized.
+    */
+  private val currentExecutionId: ThreadLocal[Option[Long]] =
+    ThreadLocal.withInitial(() => Option.empty[Long])
+
+  /** Sets the execution id for large binaries created on the current thread. 
*/
+  def setCurrentExecutionId(executionId: Long): Unit =
+    currentExecutionId.set(Some(executionId))
+
+  /**
+    * Creates a new LargeBinary reference scoped to the current execution.
     * The actual data upload happens separately via LargeBinaryOutputStream.
     *
-    * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+    * @return S3 URI string for the new LargeBinary (format: 
s3://bucket/objects/{eid}/{uuid})
     */
   def create(): String = {
-    val objectKey = 
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
-    val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
-
-    uri
+    val eid = currentExecutionId
+      .get()
+      .getOrElse(
+        throw new IllegalStateException(
+          "LargeBinaryManager.create() requires an execution context, " +
+            "but none was set on the current thread."
+        )
+      )
+    val objectKey = s"objects/$eid/${UUID.randomUUID()}"
+    s"s3://$DEFAULT_BUCKET/$objectKey"
   }
 
   /**
-    * Deletes all large binaries from the bucket.
+    * Deletes all large binaries belonging to a single execution.
     *
-    * @throws java.lang.Exception if the deletion fails
-    * @return Unit
+    * @param executionId the execution whose large binaries should be removed
+    */
+  def deleteByExecution(executionId: Long): Unit =
+    deleteByExecution(executionId, S3StorageClient.deleteDirectory)
+
+  /**
+    * Overload that takes the directory-delete operation as a parameter. 
Visible for
+    * testing
     */
-  def deleteAllObjects(): Unit = {
+  private[util] def deleteByExecution(
+      executionId: Long,
+      deleteDir: (String, String) => Unit
+  ): Unit = {
     try {
-      S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
-      logger.info(s"Successfully deleted all large binaries from bucket: 
$DEFAULT_BUCKET")
+      deleteDir(DEFAULT_BUCKET, s"objects/$executionId")
+      logger.info(
+        s"Deleted large binaries for execution $executionId from bucket: 
$DEFAULT_BUCKET"
+      )
     } catch {
       case e: Exception =>

Review Comment:
   Changed to `error`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to