Rafig9501 opened a new pull request, #51042:
URL: https://github.com/apache/spark/pull/51042

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
     7. If you want to add a new configuration, please read the guideline first 
for naming configurations in
        
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the 
guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   This PR introduces a new feature to offload the intermediate results of 
`RDD.collect()` to S3-compatible storage (e.g., MinIO) to reduce driver memory 
pressure during large result collection. The enhancement adds logic to:
   
   - Write partition results to S3 in the task phase.
   - Stream data from S3 in the driver phase to construct the final collected 
array.
   - Clean up temporary session directories based on configuration.
   
   The implementation introduces the following new configuration flags:
   
   - `spark.rdd.collect.offloadToS3.enabled` – enables/disables the offloading 
logic.
   - `spark.rdd.collect.s3.path` – sets the target S3 path for temporary data.
   - `spark.rdd.collect.s3.cleanup` – controls whether S3 offload data should 
be cleaned up after collection.
   
   Fallback logic to default `collect()` is implemented for error scenarios 
(e.g., S3 write failure), ensuring reliability.
   
   ### Why are the changes needed?
   
   The default `RDD.collect()` behavior places the burden of materializing all 
partition results on the driver, which may lead to OOM errors when collecting 
large datasets. By offloading partition results to S3 during task execution and 
streaming them back in the driver, we significantly reduce the driver's memory 
footprint.
   
   This is especially helpful for:
   
   - Collecting very large RDDs (e.g., tens of millions of records).
   - Environments with memory-constrained drivers.
   - Scenarios where partial failure resilience (via fallback) is desirable.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes.
   
   This PR introduces three new user-facing Spark configuration properties:
   
   - `spark.rdd.collect.offloadToS3.enabled` (default: false)
   - `spark.rdd.collect.s3.path` (no default; must be explicitly set)
   - `spark.rdd.collect.s3.cleanup` (default: true)
   
   If offloading is enabled and properly configured, the driver no longer 
receives all partitions' data in memory directly from the executors.
   
   ### How was this patch tested?
   
   This feature was tested via a custom multi-phase validation suite using 
`spark-shell`, structured as follows:
   
   - **Basic Functional Tests**: Validate offloading behavior with small RDDs 
and toggle settings (`offload`, `cleanup`).
   - **Edge Cases**:
     - Empty RDDs
     - RDDs with case class objects (serialization validation)
     - Disabling offloading to check fallback behavior
   - **Stress Test**:
     - RDDs with ~20 million records to simulate high memory pressure.
   - **Error Handling**:
     - Simulate misconfigured or inaccessible S3 paths to verify safe fallback 
to default `collect()` logic.
   
   Testing included checking:
   
   - MinIO (S3) for session directory creation and cleanup
   - Logs for correct phase transitions and fallback messages
   - Collection result accuracy using `assert()`s on output
   
   Full testing guide is included in project documentation for reproducibility.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to