kunwp1 commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3366022883


##########
amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py:
##########
@@ -26,7 +26,12 @@
 
 class InitializeExecutorHandler(ControlHandler):
     async def initialize_executor(self, req: InitializeExecutorRequest) -> 
EmptyReturn:
+        from pytexera.storage.large_binary_manager import LargeBinaryManager

Review Comment:
   Decided not to change proto.



##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,94 @@
 and LargeBinaryInputStream/LargeBinaryOutputStream instead.
 """
 
-import time
 import uuid
 from loguru import logger
 from core.storage.storage_config import StorageConfig
 
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
 
+class LargeBinaryManager:
+    """Manages execution-scoped large binaries in S3 for a worker process.
 
-def _get_s3_client():
-    """Get or initialize S3 client (lazy initialization, cached)."""
-    global _s3_client
-    if _s3_client is None:
-        try:
-            import boto3
-            from botocore.config import Config
-        except ImportError as e:
-            raise RuntimeError("boto3 required. Install with: pip install 
boto3") from e
-
-        _s3_client = boto3.client(
-            "s3",
-            endpoint_url=StorageConfig.S3_ENDPOINT,
-            aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
-            aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
-            region_name=StorageConfig.S3_REGION,
-            config=Config(signature_version="s3v4", s3={"addressing_style": 
"path"}),
-        )
-    return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
-    """Ensure S3 bucket exists, creating it if necessary."""
-    s3 = _get_s3_client()
-    try:
-        s3.head_bucket(Bucket=bucket)
-    except s3.exceptions.NoSuchBucket:
-        logger.debug(f"Bucket {bucket} not found, creating it")
-        s3.create_bucket(Bucket=bucket)
-        logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+    Implemented as a singleton: ``LargeBinaryManager()`` always returns the 
same
+    instance, so the cached S3 client and the current execution id are shared 
across
+    all callers in the worker process. A Python worker is a single process 
serving one
+    execution. Mirrors the JVM ``object LargeBinaryManager``.
     """
-    Creates a new largebinary reference with a unique S3 URI.
 
-    Returns:
-        S3 URI string (format: s3://bucket/key)
-    """
-    _ensure_bucket_exists(DEFAULT_BUCKET)
-    timestamp_ms = int(time.time() * 1000)
-    unique_id = uuid.uuid4()
-    object_key = f"objects/{timestamp_ms}/{unique_id}"
-    return f"s3://{DEFAULT_BUCKET}/{object_key}"
+    DEFAULT_BUCKET = "texera-large-binaries"

Review Comment:
   Defined in StorageConfig. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to