Sinisa Knezevic created SPARK-30536:
---------------------------------------

             Summary: Sort-merge join operator spilling performance improvements
                 Key: SPARK-30536
                 URL: https://issues.apache.org/jira/browse/SPARK-30536
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 2.4.4, 2.4.3
            Reporter: Sinisa Knezevic


Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs (example 
query 14) are not able to run even with extremely large Spark executor 
memory.Spark spilling feature has to be enabled, in order to be able to process 
these SQLs. Processing of SQLs becomes extremely slow when spilling is 
enabled.The Spark spilling feature is enabled via two parameters: 
“spark.sql.sortMergeJoinExec.buffer.in.memory.threshold” and 
“spark.sql.sortMergeJoinExec.buffer.spill.threshold”

“spark.sql.sortMergeJoinExec.buffer.in.memory.threshold” – when this threshold 
is reached, the data will be moved from ExternalAppendOnlyUnsafeRowArrey object 
into UnsafeExternalSorter object.

“spark.sql.sortMergeJoinExec.buffer.spill.threshold” – when this threshold is 
reached, the data will be spilled from UnsafeExternalSorter object onto the 
disk.

 

During execution of sort-merge join (Left Semi Join ) for each left join row 
“right matches” are found and stored into ExternalAppendOnlyUnsafeRowArrey 
object.In the case of Query 14 there are millions of rows of “right matches”. 
To run this query spilling is enabled and data is moved from 
ExternalAppendOnlyUnsafeRowArrey into UnsafeExternalSorter and then spilled 
onto the disk.When million rows are processed on left side of the join, the 
iterator on top of spilled “right matches” rows is created each time. This 
means that millions of time iterator on top of right matches (that are spilled 
on the disk) is created.The current Spark implementation creates iterator on 
top of spilled rows and producing I/0 which results into millions of I/0 when 
million rows are processed.

 

To avoid the performance bottleneck this JIRA introducing following solution:

1. Implement lazy initialization of UnsafeSorterSpillReader - iterator on top 
of spilled rows:
    … During SortMergeJoin (Left Semi Join) execution, the iterator on the 
spill data is created but no iteration over the data is done.
   ... Having lazy initialization of UnsafeSorterSpillReader will enable 
efficient processing of SortMergeJoin even if data is spilled onto disk. 
Unnecessary I/O will be avoided.

2. Decrease initial memory read buffer size in UnsafeSorterSpillReader from 1MB 
to 1KB:
    … UnsafeSorterSpillReader constructor takes lot of time due to size of 
default 1MB memory read buffer.
    … The code already has logic to increase the memory read buffer if it 
cannot fit the data, so decreasing the size to 1K is safe and has positive 
performance impact.

3. Improve memory utilization when spilling is enabled in 
ExternalAppendOnlyUnsafeRowArrey:

    … In the current implementation, when spilling is enabled, 
UnsafeExternalSorter object is created and then data moved from 
ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then 
ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before 
ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in 
the memory with the same data. That require double memory and there is 
duplication of data. This can be avoided.

    … In the proposed solution, when 
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached  adding new 
rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter 
object is created and new rows are added into this object. 
ExternalAppendOnlyUnsafeRowArray object retains all rows already added into 
this object. This approach will enable better memory utilization and avoid 
unnecessary movement of data from one object into another.

 

The test of this solution with query 14 and enabled spilling on the disk, 
showed 500X performance improvements and it didn’t degrade performance of the 
other SQLs from TPC-DS benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to