siknezevic opened a new pull request #27246:
URL: https://github.com/apache/spark/pull/27246
### What changes were proposed in this pull request?
The following list of changes will improve SQL execution performance when
data is spilled on the disk:
1) Implement lazy initialization of UnsafeSorterSpillReader - iterator on
top of spilled rows:
... During SortMergeJoin (Left Semi Join) execution, the iterator on
the spill data is created but no iteration over the data is done.
... Having lazy initialization of UnsafeSorterSpillReader will
enable efficient processing of SortMergeJoin even if data is spilled onto disk.
Unnecessary I/O will be avoided.
2) Decrease initial memory read buffer size in UnsafeSorterSpillReader
from 1MB to 1KB:
... UnsafeSorterSpillReader constructor takes lot of time due to size of
default 1MB memory read buffer.
... The code already has logic to increase the memory read buffer if
it cannot fit the data, so decreasing the size to 1K is safe and has positive
performance impact.
3) Improve memory utilization when spilling is enabled in
ExternalAppendOnlyUnsafeRowArrey
... In the current implementation, when spilling is enabled,
UnsafeExternalSorter object is created and then data moved from
ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then
ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before
ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in
the memory with the same data. That require double memory and there is
duplication of data. This can be avoided.
... In the proposed solution, when
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached adding new
rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter
object is created and new rows are added into this object.
ExternalAppendOnlyUnsafeRowArray object retains all rows already added into
this object. This approach will enable better memory utilization and avoid
unnecessary movement of data from one object into another.
### Why are the changes needed?
Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs
(example query 14) are not able to run even with extremely large Spark executor
memory.Spark spilling feature has to be enabled, in order to be able to process
these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled.
The test of this solution with query 14 and enabled spilling on the disk,
showed 500X performance improvements and it didnt degrade performance of the
other SQLs from TPC-DS benchmark.
ALSO
These are micro-benchmark results that simulate 100000 joins when spilling
is enabled and there is NO patch applied. The micro-benchmark will produce
three spill files and for each processed join row it will create iterator on
top of data in-memory and data that is spilled on the disk
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux
4.4.0-178-generic
Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
//------------------------------------------------------------------------------------------------------------------------
UnsafeSorterSpillReader 932683 **943898**
NaN 0.0 3643292.4 1.0X
These are micro-benchmark results that simulate 100000 joins when spilling
is enabled and there IS patch applied. The micro-benchmark will create three
spill files and for each processed join row it will create “lazy” iterator on
top of data in-memory and data that is spilled on the disk
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux
4.4.0-178-generic
Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
//------------------------------------------------------------------------------------------------------------------------
UnsafeSorterSpillReader_bufferSize1024 411 **426**
13 0.6 1607.2 1.0X
Comparison of these two micro-benchmarks shows more than **2000 X**
(943898/426 = 2215.72 X ) difference in performance. The more spilling files
will cause slower performance due to fact that the current code reads number of
rows from each spilled fragment and it is producing I/0 (which is in
milliseconds). This is done for each join row and if there are millions of join
rows, the processing will be very slow as above results shows. Hence there is
need for this PR.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By running TPC-DS SQLs with different data sets 10 TB and 100 TB
By running all Spark tests.
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
-->
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]