kunwp1 commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3366022530


##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,94 @@
 and LargeBinaryInputStream/LargeBinaryOutputStream instead.
 """
 
-import time
 import uuid
 from loguru import logger
 from core.storage.storage_config import StorageConfig
 
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
 
+class LargeBinaryManager:
+    """Manages execution-scoped large binaries in S3 for a worker process.
 
-def _get_s3_client():
-    """Get or initialize S3 client (lazy initialization, cached)."""
-    global _s3_client
-    if _s3_client is None:
-        try:
-            import boto3
-            from botocore.config import Config
-        except ImportError as e:
-            raise RuntimeError("boto3 required. Install with: pip install 
boto3") from e
-
-        _s3_client = boto3.client(
-            "s3",
-            endpoint_url=StorageConfig.S3_ENDPOINT,
-            aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
-            aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
-            region_name=StorageConfig.S3_REGION,
-            config=Config(signature_version="s3v4", s3={"addressing_style": 
"path"}),
-        )
-    return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
-    """Ensure S3 bucket exists, creating it if necessary."""
-    s3 = _get_s3_client()
-    try:
-        s3.head_bucket(Bucket=bucket)
-    except s3.exceptions.NoSuchBucket:
-        logger.debug(f"Bucket {bucket} not found, creating it")
-        s3.create_bucket(Bucket=bucket)
-        logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+    Implemented as a singleton: ``LargeBinaryManager()`` always returns the 
same
+    instance, so the cached S3 client and the current execution id are shared 
across
+    all callers in the worker process. A Python worker is a single process 
serving one
+    execution. Mirrors the JVM ``object LargeBinaryManager``.
     """
-    Creates a new largebinary reference with a unique S3 URI.
 
-    Returns:
-        S3 URI string (format: s3://bucket/key)
-    """
-    _ensure_bucket_exists(DEFAULT_BUCKET)
-    timestamp_ms = int(time.time() * 1000)
-    unique_id = uuid.uuid4()
-    object_key = f"objects/{timestamp_ms}/{unique_id}"
-    return f"s3://{DEFAULT_BUCKET}/{object_key}"
+    DEFAULT_BUCKET = "texera-large-binaries"
+
+    _instance = None
+
+    def __new__(cls):
+        if cls._instance is None:
+            instance = super().__new__(cls)
+            instance._s3_client = None
+            # Execution context: set at executor init and read by create() so 
the
+            # user-facing largebinary() API stays execution-id-free.
+            instance._current_execution_id = None
+            cls._instance = instance
+        return cls._instance
+
+    def set_current_execution_id(self, execution_id):
+        """Sets the execution id used to scope large binaries created by this 
worker."""
+        self._current_execution_id = execution_id
+
+    def get_current_execution_id(self):
+        """Returns the execution id set for this worker, or None if unset."""
+        return self._current_execution_id
+
+    def _get_s3_client(self):
+        """Get or initialize the S3 client (lazy initialization, cached)."""
+        if self._s3_client is None:
+            try:
+                import boto3
+                from botocore.config import Config
+            except ImportError as e:
+                raise RuntimeError(
+                    "boto3 required. Install with: pip install boto3"
+                ) from e
+
+            self._s3_client = boto3.client(
+                "s3",
+                endpoint_url=StorageConfig.S3_ENDPOINT,
+                aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+                aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+                region_name=StorageConfig.S3_REGION,
+                config=Config(
+                    signature_version="s3v4", s3={"addressing_style": "path"}
+                ),
+            )
+        return self._s3_client
+
+    def _ensure_bucket_exists(self, bucket: str):
+        """Ensure the S3 bucket exists, creating it if necessary."""
+        s3 = self._get_s3_client()
+        try:
+            s3.head_bucket(Bucket=bucket)
+        except s3.exceptions.NoSuchBucket:
+            logger.debug(f"Bucket {bucket} not found, creating it")
+            s3.create_bucket(Bucket=bucket)
+            logger.info(f"Created bucket: {bucket}")
+
+    def create(self) -> str:
+        """
+        Creates a new largebinary reference with a unique, execution-scoped S3 
URI.
+
+        The object key is namespaced by the current execution id so cleanup 
can delete
+        only this execution's objects. The execution id is injected by the 
system (set
+        via set_current_execution_id() when the worker is initialized); 
callers never
+        pass it.
+
+        Returns:
+            S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid})
+        """
+        self._ensure_bucket_exists(self.DEFAULT_BUCKET)

Review Comment:
   This comment is outdated after I made some changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to