Yicong-Huang commented on code in PR #5280:
URL: https://github.com/apache/texera/pull/5280#discussion_r3337960000
##########
amber/src/main/python/pytexera/storage/large_binary_manager.py:
##########
@@ -22,57 +22,92 @@
and LargeBinaryInputStream/LargeBinaryOutputStream instead.
"""
-import time
import uuid
from loguru import logger
from core.storage.storage_config import StorageConfig
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
+class LargeBinaryManager:
+ """Manages execution-scoped large binaries in S3 for a worker process.
-def _get_s3_client():
- """Get or initialize S3 client (lazy initialization, cached)."""
- global _s3_client
- if _s3_client is None:
- try:
- import boto3
- from botocore.config import Config
- except ImportError as e:
- raise RuntimeError("boto3 required. Install with: pip install
boto3") from e
-
- _s3_client = boto3.client(
- "s3",
- endpoint_url=StorageConfig.S3_ENDPOINT,
- aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
- aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
- region_name=StorageConfig.S3_REGION,
- config=Config(signature_version="s3v4", s3={"addressing_style":
"path"}),
- )
- return _s3_client
-
-
-def _ensure_bucket_exists(bucket: str):
- """Ensure S3 bucket exists, creating it if necessary."""
- s3 = _get_s3_client()
- try:
- s3.head_bucket(Bucket=bucket)
- except s3.exceptions.NoSuchBucket:
- logger.debug(f"Bucket {bucket} not found, creating it")
- s3.create_bucket(Bucket=bucket)
- logger.info(f"Created bucket: {bucket}")
-
-
-def create() -> str:
+ A Python worker is a single process serving one execution, so a single
shared
+ instance (the module-level ``large_binary_manager``) holds the cached S3
client
+ and the current execution id. Mirrors the JVM ``LargeBinaryManager``.
"""
- Creates a new largebinary reference with a unique S3 URI.
- Returns:
- S3 URI string (format: s3://bucket/key)
- """
- _ensure_bucket_exists(DEFAULT_BUCKET)
- timestamp_ms = int(time.time() * 1000)
- unique_id = uuid.uuid4()
- object_key = f"objects/{timestamp_ms}/{unique_id}"
- return f"s3://{DEFAULT_BUCKET}/{object_key}"
+ DEFAULT_BUCKET = "texera-large-binaries"
+
+ def __init__(self):
+ self._s3_client = None
+ # Execution context: set at executor init and read by create() so the
+ # user-facing largebinary() API stays execution-id-free.
+ self._current_execution_id = None
+
+ def set_current_execution_id(self, execution_id):
+ """Sets the execution id used to scope large binaries created by this
worker."""
+ self._current_execution_id = execution_id
+
+ def get_current_execution_id(self):
+ """Returns the execution id set for this worker, or None if unset."""
+ return self._current_execution_id
+
+ def _get_s3_client(self):
+ """Get or initialize the S3 client (lazy initialization, cached)."""
+ if self._s3_client is None:
+ try:
+ import boto3
+ from botocore.config import Config
+ except ImportError as e:
+ raise RuntimeError(
+ "boto3 required. Install with: pip install boto3"
+ ) from e
+
+ self._s3_client = boto3.client(
+ "s3",
+ endpoint_url=StorageConfig.S3_ENDPOINT,
+ aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+ aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+ region_name=StorageConfig.S3_REGION,
+ config=Config(
+ signature_version="s3v4", s3={"addressing_style": "path"}
+ ),
+ )
+ return self._s3_client
+
+ def _ensure_bucket_exists(self, bucket: str):
+ """Ensure the S3 bucket exists, creating it if necessary."""
+ s3 = self._get_s3_client()
+ try:
+ s3.head_bucket(Bucket=bucket)
+ except s3.exceptions.NoSuchBucket:
+ logger.debug(f"Bucket {bucket} not found, creating it")
+ s3.create_bucket(Bucket=bucket)
+ logger.info(f"Created bucket: {bucket}")
+
+ def create(self) -> str:
+ """
+ Creates a new largebinary reference with a unique, execution-scoped S3
URI.
+
+ The object key is namespaced by the current execution id so cleanup
can delete
+ only this execution's objects. The execution id is injected by the
system (set
+ via set_current_execution_id() when the worker is initialized);
callers never
+ pass it.
+
+ Returns:
+ S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid})
+ """
+ self._ensure_bucket_exists(self.DEFAULT_BUCKET)
+ execution_id = self.get_current_execution_id()
+ if execution_id is None:
+ raise RuntimeError(
+ "largebinary() requires an execution context, but no execution
id "
+ "has been set for this worker."
+ )
+ unique_id = uuid.uuid4()
+ object_key = f"objects/{execution_id}/{unique_id}"
+ return f"s3://{self.DEFAULT_BUCKET}/{object_key}"
+
+
+# Shared singleton for the worker process. Consumers import this instance:
+# from pytexera.storage.large_binary_manager import large_binary_manager
+large_binary_manager = LargeBinaryManager()
Review Comment:
Thanks. it looks correct to me. it is always better to add a simple test
case to guard it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]