Xiao-zhen-Liu commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3368443833


##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -20,44 +20,88 @@
 package org.apache.texera.service.util
 
 import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.config.StorageConfig
 
 import java.util.UUID
 
-/**
-  * Manages the lifecycle of LargeBinaries stored in S3.
-  *
-  * Handles creation and deletion of large objects that exceed
-  * normal tuple size limits.
-  */
+/** Manages the lifecycle of LargeBinaries (objects too large for normal 
tuples) in S3. */
 object LargeBinaryManager extends LazyLogging {
-  val DEFAULT_BUCKET: String = "texera-large-binaries"
+  // From config so the JVM/Python workers and cleanup all share one bucket.
+  val DEFAULT_BUCKET: String = StorageConfig.s3LargeBinariesBucket
+
+  /** Per-execution key prefix; the single source of truth for the write and 
delete paths. */
+  private def executionPrefix(executionId: Long): String = 
s"objects/$executionId"
 
   /**
-    * Creates a new LargeBinary reference.
-    * The actual data upload happens separately via LargeBinaryOutputStream.
+    * Base URI (trailing slash) under which `executionId`'s large binaries 
live; create()
+    * appends a unique suffix. Empty when the bucket is unconfigured, so 
create() fails loudly.
     *
-    * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+    * `executionId` must be a persisted EID. The sentinel id 
(DEFAULT_EXECUTION_ID = 1) shares

Review Comment:
   Good that this is called out. Since the assumption isn't enforced in code, 
worth confirming a default-context run (`eid = DEFAULT_EXECUTION_ID = 1`) can 
never create large binaries while a real execution 1 exists — otherwise the two 
share `objects/1/` and one's cleanup wipes the other's. If that can't be ruled 
out, a cheap guard (reject `eid == DEFAULT_EXECUTION_ID`, or assert a persisted 
eid) would make it safe. Non-blocking.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala:
##########
@@ -91,6 +95,10 @@ class DPThread(
       dpThread = dpThreadExecutor.submit(new Runnable() {
         def run(): Unit = {
           Thread.currentThread().setName(getThreadName)
+          // Seed this thread's large-binary base URI (from WorkerConfig) 
before any tuple,
+          // so create() can append a suffix without the execution id. Once 
per thread,
+          // assuming a thread serves one execution.
+          LargeBinaryManager.setCurrentBaseUri(largeBinaryBaseUri)

Review Comment:
   Minor: the manager's behavior is unit-tested, but I don't see a test that 
this seeding is actually wired — i.e. a worker constructed with a base URI 
produces execution-scoped keys from `create()`. The wiring is trivial so it's 
low priority, but a small test would lock it in against future refactors.



##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -20,44 +20,88 @@
 package org.apache.texera.service.util
 
 import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.config.StorageConfig
 
 import java.util.UUID
 
-/**
-  * Manages the lifecycle of LargeBinaries stored in S3.
-  *
-  * Handles creation and deletion of large objects that exceed
-  * normal tuple size limits.
-  */
+/** Manages the lifecycle of LargeBinaries (objects too large for normal 
tuples) in S3. */
 object LargeBinaryManager extends LazyLogging {
-  val DEFAULT_BUCKET: String = "texera-large-binaries"
+  // From config so the JVM/Python workers and cleanup all share one bucket.
+  val DEFAULT_BUCKET: String = StorageConfig.s3LargeBinariesBucket
+
+  /** Per-execution key prefix; the single source of truth for the write and 
delete paths. */
+  private def executionPrefix(executionId: Long): String = 
s"objects/$executionId"
 
   /**
-    * Creates a new LargeBinary reference.
-    * The actual data upload happens separately via LargeBinaryOutputStream.
+    * Base URI (trailing slash) under which `executionId`'s large binaries 
live; create()
+    * appends a unique suffix. Empty when the bucket is unconfigured, so 
create() fails loudly.
     *
-    * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+    * `executionId` must be a persisted EID. The sentinel id 
(DEFAULT_EXECUTION_ID = 1) shares
+    * this space, so binaries must only be created under a real execution — 
else execution 1
+    * and a default-context run would collide under objects/1/.
     */
-  def create(): String = {
-    val objectKey = 
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
-    val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
+  def baseUriForExecution(executionId: Long): String =
+    if (DEFAULT_BUCKET.isEmpty) ""
+    else s"s3://$DEFAULT_BUCKET/${executionPrefix(executionId)}/"
+
+  /**
+    * Base URI for binaries created on the current thread — thread-local 
because create()
+    * runs on the DP thread. Seeded once at DP-thread start, assuming a thread 
serves one
+    * execution; re-seed if workers are ever reused across executions.
+    */
+  private val currentBaseUri: ThreadLocal[Option[String]] =
+    ThreadLocal.withInitial(() => Option.empty[String])
+
+  /** Sets the current thread's base URI; an empty value clears it, so 
create() fails loudly. */
+  def setCurrentBaseUri(baseUri: String): Unit =
+    currentBaseUri.set(Option(baseUri).filter(_.nonEmpty))
 
-    uri
+  /**
+    * Creates a LargeBinary reference by appending a unique suffix to the 
current thread's
+    * base URI. Data is uploaded separately via LargeBinaryOutputStream.
+    *
+    * @return e.g. s3://bucket/objects/{eid}/{uuid}
+    */
+  def create(): String = {
+    val baseUri = currentBaseUri
+      .get()
+      .getOrElse(
+        throw new IllegalStateException(
+          "LargeBinaryManager.create() requires a base URI, " +
+            "but none was set on the current thread."
+        )
+      )
+    s"$baseUri${UUID.randomUUID()}"
   }
 
   /**
-    * Deletes all large binaries from the bucket.
+    * Deletes all large binaries for one execution. Uses deleteDirectory, 
which removes one
+    * listing page (<= 1000 objects) — enough for expected counts; more needs 
a paginated delete.

Review Comment:
   `deleteDirectory` lists with a single `listObjectsV2` and doesn't follow the 
continuation token, so this deletes only the first ≤1000 objects — an execution 
with more than 1000 large binaries would leak the rest. It's reachable: 
`FileScanUtils` creates one `LargeBinary` per file, so a scan over >1000 files 
in large-binary mode exceeds it. Not introduced here (the old 
`deleteAllObjects` had the same cap), so it needn't block this PR — but worth a 
tracked follow-up to paginate the delete, and it likely affects other 
`deleteDirectory` callers too.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala:
##########
@@ -209,7 +209,8 @@ class PythonWorkflowWorker(
         StorageConfig.s3Endpoint,
         StorageConfig.s3Region,
         StorageConfig.s3Username,
-        StorageConfig.s3Password
+        StorageConfig.s3Password,
+        workerConfig.largeBinaryBaseUri

Review Comment:
   Minor: this adds a 20th positional arg that `texera_run_python_worker.py` 
unpacks by position — fragile if the two ever drift. It follows the existing 
convention so it's fine here, but the growing positional list may be worth 
moving to named args at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to