Adrian Muraru created SPARK-27734:
-------------------------------------

             Summary: Add memory based thresholds for shuffle spill
                 Key: SPARK-27734
                 URL: https://issues.apache.org/jira/browse/SPARK-27734
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 2.4.3
            Reporter: Adrian Muraru


When running large shuffles (700TB input data, 200k map tasks, 50k reducers on 
a 300 nodes cluster) the job is regularly OOMing in map and reduce phase.

IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and 
ExternalSorter (reduce side) are trying to max out the available execution 
memory. This in turn doesn't play nice with the Garbage Collector and executors 
are failing with OutOfMemoryError when the memory allocation from these 
in-memory structure is maxing out the available heap size (in our case we are 
running with 9 cores/executor, 32G per executor)

To mitigate this, I set {{spark.shuffle.spill.numElementsForceSpillThreshold}} 
to force the spill on disk. While this config works, it is not flexible enough 
as it's expressed in number of elements, and in our case we run multiple 
shuffles in a single job and element size is different from one stage to 
another.

We have an internal patch to extend this behaviour and add two new parameters 
to control the spill based on memory usage:

- spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold
- spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to