Xiao-zhen-Liu commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3358425311


##########
amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala:
##########
@@ -611,7 +611,8 @@ class WorkflowResource extends LazyLogging {
         .asScala
         .toList
 
-      LargeBinaryManager.deleteAllObjects()
+      // Delete large binaries for each execution belonging to the workflows 
being removed
+      eids.foreach(eid => 
LargeBinaryManager.deleteByExecution(eid.longValue()))

Review Comment:
   The large binaries are deleted here, but the database deletion happens in 
the transaction at line 632. If that transaction rolls back (e.g. one of the 
workflow ids doesn't belong to the user), the workflow rows survive but their 
binaries are already gone. The document cleanup just below already runs *after* 
the transaction for this exact reason — I'd move this `deleteByExecution` call 
down next to it.



##########
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto:
##########
@@ -252,6 +252,7 @@ message InitializeExecutorRequest {
   int32 totalWorkerCount = 1;
   core.OpExecInitInfo opExecInitInfo = 2;
   bool isSource = 3;
+  core.ExecutionIdentity executionId = 4;

Review Comment:
   This field tells the worker which execution it's in, so `create()` can put 
the execution id in the S3 key. It works, but it makes large binaries the odd 
one out.
   
   Every other execution-scoped resource (results, state, console messages) is 
named by the controller — which already has the execution id — and the finished 
location is handed to the worker as data, so the worker never needs the id. 
Large binaries are the exception: the worker builds the location itself in 
`create()`, which is the only reason we now push the execution id down to 
workers (this field, the handler change, and the per-engine id state all exist 
just for that).
   
   The consistent fix is to hand the worker a base location the same way the 
controller already hands down port locations, and let `create()` append a 
unique id — then this field and the worker-side state go away. (Large binaries 
being created on the fly only stops the controller from pre-creating the 
objects, not from handing down the location.)
   
   This is bigger than a bug fix, so it's your call whether to do it now or in 
a follow-up — what's here works fine. Flagging it because this approach puts 
state on the worker that the cleaner design would later remove.



##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging {
   val DEFAULT_BUCKET: String = "texera-large-binaries"
 
   /**
-    * Creates a new LargeBinary reference.
+    * Worker-scoped execution context. It is set on the data-processing thread 
when an
+    * executor is initialized.
+    */
+  private val currentExecutionId: ThreadLocal[Option[Long]] =

Review Comment:
   This whole mechanism depends on the execution id being set and read on the 
same worker thread, but that requirement isn't written down anywhere. Please 
state it plainly here — something like "must be set on, and read from, the 
worker's data-processing thread." (Minor: the Python manager's docstring says 
it "Mirrors the JVM object LargeBinaryManager"; the two actually use different 
mechanisms — a thread-local here vs. a process-wide singleton there — so it's 
worth noting they differ rather than match.)



##########
amber/src/test/python/pytexera/storage/test_large_binary_manager.py:
##########
@@ -152,3 +162,35 @@ def test_create_uses_default_bucket(self):
 
             uri = large_binary_manager.create()
             assert large_binary_manager.DEFAULT_BUCKET in uri
+            assert 
f"objects/{large_binary_manager.get_current_execution_id()}/" in uri
+
+
+def test_create_stamps_execution_id(monkeypatch):
+    # Avoid touching real S3 while testing key generation.
+    monkeypatch.setattr(
+        large_binary_manager, "_ensure_bucket_exists", lambda bucket: None
+    )
+    monkeypatch.setattr(large_binary_manager, "_current_execution_id", 42)

Review Comment:
   This sets the private `_current_execution_id` directly, while the class 
fixture above uses the public setter. Using the setter here too would be more 
consistent (and safer if the internals change).



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala:
##########
@@ -44,6 +45,7 @@ trait InitializeExecutorHandler {
         )
       )
     cachedTotalWorkerCount = req.totalWorkerCount
+    req.executionId.foreach(eid => 
LargeBinaryManager.setCurrentExecutionId(eid.id))

Review Comment:
   The two engines disagree when the id is missing: here, `foreach` does 
nothing and the previous value stays; the Python handler stores `None`, which 
makes the next `create()` fail loudly. Worth making them behave the same. The 
cleanest fix is to make the proto field required (`no_box`), which removes the 
"missing id" case entirely and lets you drop the `Some(...)` wrapper at 
`RegionExecutionCoordinator.scala:408`. (Relevant only if the current approach 
stays — see the design note on the proto field.)



##########
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala:
##########
@@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging {
   val DEFAULT_BUCKET: String = "texera-large-binaries"
 
   /**
-    * Creates a new LargeBinary reference.
+    * Worker-scoped execution context. It is set on the data-processing thread 
when an
+    * executor is initialized.
+    */
+  private val currentExecutionId: ThreadLocal[Option[Long]] =
+    ThreadLocal.withInitial(() => Option.empty[Long])
+
+  /** Sets the execution id for large binaries created on the current thread. 
*/
+  def setCurrentExecutionId(executionId: Long): Unit =
+    currentExecutionId.set(Some(executionId))
+
+  /**
+    * Creates a new LargeBinary reference scoped to the current execution.
     * The actual data upload happens separately via LargeBinaryOutputStream.
     *
-    * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+    * @return S3 URI string for the new LargeBinary (format: 
s3://bucket/objects/{eid}/{uuid})
     */
   def create(): String = {
-    val objectKey = 
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
-    val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
-
-    uri
+    val eid = currentExecutionId
+      .get()
+      .getOrElse(
+        throw new IllegalStateException(
+          "LargeBinaryManager.create() requires an execution context, " +
+            "but none was set on the current thread."
+        )
+      )
+    val objectKey = s"objects/$eid/${UUID.randomUUID()}"
+    s"s3://$DEFAULT_BUCKET/$objectKey"
   }
 
   /**
-    * Deletes all large binaries from the bucket.
+    * Deletes all large binaries belonging to a single execution.
     *
-    * @throws java.lang.Exception if the deletion fails
-    * @return Unit
+    * @param executionId the execution whose large binaries should be removed
+    */
+  def deleteByExecution(executionId: Long): Unit =
+    deleteByExecution(executionId, S3StorageClient.deleteDirectory)
+
+  /**
+    * Overload that takes the directory-delete operation as a parameter. 
Visible for
+    * testing
     */
-  def deleteAllObjects(): Unit = {
+  private[util] def deleteByExecution(
+      executionId: Long,
+      deleteDir: (String, String) => Unit
+  ): Unit = {
     try {
-      S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
-      logger.info(s"Successfully deleted all large binaries from bucket: 
$DEFAULT_BUCKET")
+      deleteDir(DEFAULT_BUCKET, s"objects/$executionId")
+      logger.info(
+        s"Deleted large binaries for execution $executionId from bucket: 
$DEFAULT_BUCKET"
+      )
     } catch {
       case e: Exception =>

Review Comment:
   This catches every exception and logs a single WARN, so a real failure (bad 
credentials, unreachable endpoint, partial delete) silently leaves storage 
behind. If swallowing is intentional — so a cleanup hiccup doesn't fail the 
workflow delete — that's reasonable, but please log at `error` and/or say so in 
a comment, otherwise leaked binaries are invisible.



##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,94 @@
 and LargeBinaryInputStream/LargeBinaryOutputStream instead.
 """
 
-import time
 import uuid
 from loguru import logger
 from core.storage.storage_config import StorageConfig
 
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
 
+class LargeBinaryManager:
+    """Manages execution-scoped large binaries in S3 for a worker process.
 
-def _get_s3_client():
-    """Get or initialize S3 client (lazy initialization, cached)."""
-    global _s3_client
-    if _s3_client is None:
-        try:
-            import boto3
-            from botocore.config import Config
-        except ImportError as e:
-            raise RuntimeError("boto3 required. Install with: pip install 
boto3") from e
-
-        _s3_client = boto3.client(
-            "s3",
-            endpoint_url=StorageConfig.S3_ENDPOINT,
-            aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
-            aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
-            region_name=StorageConfig.S3_REGION,
-            config=Config(signature_version="s3v4", s3={"addressing_style": 
"path"}),
-        )
-    return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
-    """Ensure S3 bucket exists, creating it if necessary."""
-    s3 = _get_s3_client()
-    try:
-        s3.head_bucket(Bucket=bucket)
-    except s3.exceptions.NoSuchBucket:
-        logger.debug(f"Bucket {bucket} not found, creating it")
-        s3.create_bucket(Bucket=bucket)
-        logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+    Implemented as a singleton: ``LargeBinaryManager()`` always returns the 
same
+    instance, so the cached S3 client and the current execution id are shared 
across
+    all callers in the worker process. A Python worker is a single process 
serving one
+    execution. Mirrors the JVM ``object LargeBinaryManager``.
     """
-    Creates a new largebinary reference with a unique S3 URI.
 
-    Returns:
-        S3 URI string (format: s3://bucket/key)
-    """
-    _ensure_bucket_exists(DEFAULT_BUCKET)
-    timestamp_ms = int(time.time() * 1000)
-    unique_id = uuid.uuid4()
-    object_key = f"objects/{timestamp_ms}/{unique_id}"
-    return f"s3://{DEFAULT_BUCKET}/{object_key}"
+    DEFAULT_BUCKET = "texera-large-binaries"

Review Comment:
   The bucket name "texera-large-binaries" is also written out in the Scala 
manager (`LargeBinaryManager.scala:33`). If one changes and the other doesn't, 
the two engines would quietly target different buckets — consider sourcing it 
from `StorageConfig`. (Goes away with the design in the proto-field note.)



##########
amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala:
##########
@@ -200,7 +200,8 @@ class WorkerSpec
         OpExecWithClassName(
           
"org.apache.texera.amber.engine.architecture.worker.DummyOperatorExecutor"
         ),
-        isSource = false
+        isSource = false,
+        executionId = None

Review Comment:
   `executionId = None` keeps this green but exercises a case production never 
produces. Passing `Some(...)` would run the same path the real system does.



##########
amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py:
##########
@@ -26,7 +26,12 @@
 
 class InitializeExecutorHandler(ControlHandler):
     async def initialize_executor(self, req: InitializeExecutorRequest) -> 
EmptyReturn:
+        from pytexera.storage.large_binary_manager import LargeBinaryManager

Review Comment:
   Two small readability notes: the import is inside the function rather than 
at the top — if that's to avoid a circular import, a one-line note would help; 
otherwise move it up. And `req.execution_id.id if req.execution_id else None` 
(next line) could use a short comment that an unset field arrives as `None`, so 
the missing case fails loudly in `create()`.



##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,94 @@
 and LargeBinaryInputStream/LargeBinaryOutputStream instead.
 """
 
-import time
 import uuid
 from loguru import logger
 from core.storage.storage_config import StorageConfig
 
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
 
+class LargeBinaryManager:
+    """Manages execution-scoped large binaries in S3 for a worker process.
 
-def _get_s3_client():
-    """Get or initialize S3 client (lazy initialization, cached)."""
-    global _s3_client
-    if _s3_client is None:
-        try:
-            import boto3
-            from botocore.config import Config
-        except ImportError as e:
-            raise RuntimeError("boto3 required. Install with: pip install 
boto3") from e
-
-        _s3_client = boto3.client(
-            "s3",
-            endpoint_url=StorageConfig.S3_ENDPOINT,
-            aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
-            aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
-            region_name=StorageConfig.S3_REGION,
-            config=Config(signature_version="s3v4", s3={"addressing_style": 
"path"}),
-        )
-    return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
-    """Ensure S3 bucket exists, creating it if necessary."""
-    s3 = _get_s3_client()
-    try:
-        s3.head_bucket(Bucket=bucket)
-    except s3.exceptions.NoSuchBucket:
-        logger.debug(f"Bucket {bucket} not found, creating it")
-        s3.create_bucket(Bucket=bucket)
-        logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+    Implemented as a singleton: ``LargeBinaryManager()`` always returns the 
same
+    instance, so the cached S3 client and the current execution id are shared 
across
+    all callers in the worker process. A Python worker is a single process 
serving one
+    execution. Mirrors the JVM ``object LargeBinaryManager``.
     """
-    Creates a new largebinary reference with a unique S3 URI.
 
-    Returns:
-        S3 URI string (format: s3://bucket/key)
-    """
-    _ensure_bucket_exists(DEFAULT_BUCKET)
-    timestamp_ms = int(time.time() * 1000)
-    unique_id = uuid.uuid4()
-    object_key = f"objects/{timestamp_ms}/{unique_id}"
-    return f"s3://{DEFAULT_BUCKET}/{object_key}"
+    DEFAULT_BUCKET = "texera-large-binaries"
+
+    _instance = None
+
+    def __new__(cls):
+        if cls._instance is None:
+            instance = super().__new__(cls)
+            instance._s3_client = None
+            # Execution context: set at executor init and read by create() so 
the
+            # user-facing largebinary() API stays execution-id-free.
+            instance._current_execution_id = None
+            cls._instance = instance
+        return cls._instance
+
+    def set_current_execution_id(self, execution_id):
+        """Sets the execution id used to scope large binaries created by this 
worker."""
+        self._current_execution_id = execution_id
+
+    def get_current_execution_id(self):
+        """Returns the execution id set for this worker, or None if unset."""
+        return self._current_execution_id
+
+    def _get_s3_client(self):
+        """Get or initialize the S3 client (lazy initialization, cached)."""
+        if self._s3_client is None:
+            try:
+                import boto3
+                from botocore.config import Config
+            except ImportError as e:
+                raise RuntimeError(
+                    "boto3 required. Install with: pip install boto3"
+                ) from e
+
+            self._s3_client = boto3.client(
+                "s3",
+                endpoint_url=StorageConfig.S3_ENDPOINT,
+                aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+                aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+                region_name=StorageConfig.S3_REGION,
+                config=Config(
+                    signature_version="s3v4", s3={"addressing_style": "path"}
+                ),
+            )
+        return self._s3_client
+
+    def _ensure_bucket_exists(self, bucket: str):
+        """Ensure the S3 bucket exists, creating it if necessary."""
+        s3 = self._get_s3_client()
+        try:
+            s3.head_bucket(Bucket=bucket)
+        except s3.exceptions.NoSuchBucket:
+            logger.debug(f"Bucket {bucket} not found, creating it")
+            s3.create_bucket(Bucket=bucket)
+            logger.info(f"Created bucket: {bucket}")
+
+    def create(self) -> str:
+        """
+        Creates a new largebinary reference with a unique, execution-scoped S3 
URI.
+
+        The object key is namespaced by the current execution id so cleanup 
can delete
+        only this execution's objects. The execution id is injected by the 
system (set
+        via set_current_execution_id() when the worker is initialized); 
callers never
+        pass it.
+
+        Returns:
+            S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid})
+        """
+        self._ensure_bucket_exists(self.DEFAULT_BUCKET)

Review Comment:
   Small ordering thing: this calls `_ensure_bucket_exists` (an S3 round-trip) 
before checking whether the execution id is set, so a worker with no id pays 
for an S3 call before failing. Check the id first.



##########
common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala:
##########
@@ -466,6 +476,24 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements(data.getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
+  }
+
+  test("deleteByExecution removes only the target execution's binaries") {

Review Comment:
   This is the test that actually proves the fix — good to have it. One small 
thing: it leaves the worker's execution id set to 1002 when it exits and relies 
on the next test's `beforeEach` to reset. Resetting in the `finally` would keep 
it self-contained if someone adds a test below it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to