siknezevic opened a new pull request #27246:
URL: https://github.com/apache/spark/pull/27246


   ### What changes were proposed in this pull request?
   
   The following list of changes will improve SQL execution performance when 
data is spilled on the disk:
   1)   Implement lazy initialization of UnsafeSorterSpillReader - iterator on 
top of spilled rows:
           ... During SortMergeJoin (Left Semi Join) execution, the iterator on 
the spill data is created but no iteration over the data is done.
           ... Having lazy initialization of UnsafeSorterSpillReader will 
enable efficient processing of SortMergeJoin even if data is spilled onto disk. 
Unnecessary I/O will be avoided.
   2)   Decrease initial memory read buffer size in UnsafeSorterSpillReader 
from 1MB to 1KB:
   ... UnsafeSorterSpillReader constructor takes lot of time due to size of 
default 1MB memory read buffer.
           ... The code already has logic to increase the memory read buffer if 
it cannot fit the data, so decreasing the size to 1K is safe and has positive 
performance impact.
   3)   Improve memory utilization when spilling is enabled in 
ExternalAppendOnlyUnsafeRowArrey
           ... In the current implementation, when spilling is enabled, 
UnsafeExternalSorter object is created and then data moved from 
ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then 
ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before 
ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in 
the memory with the same data. That require double memory and there is 
duplication of data. This can be avoided.
           ... In the proposed solution, when 
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached  adding new 
rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter 
object is created and new rows are added into this object. 
ExternalAppendOnlyUnsafeRowArray object retains all rows already added into 
this object. This approach will enable better memory utilization and avoid 
unnecessary movement of data from one object into another.
   
   ### Why are the changes needed?
   
   Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs 
(example query 14) are not able to run even with extremely large Spark executor 
memory.Spark spilling feature has to be enabled, in order to be able to process 
these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled. 
The test of this solution with query 14 and enabled spilling on the disk, 
showed 500X performance improvements and it didn’t degrade performance of the 
other SQLs from TPC-DS benchmark.
   
   ALSO
   These are micro-benchmark results that simulate 100000 joins when spilling 
is enabled and there is NO patch applied. The micro-benchmark will produce 
three spill files and for each processed join row it will create iterator on 
top of data in-memory and data that is spilled on the disk
   
   OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux 
4.4.0-178-generic
   Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
   Spilling  SpillReader with 16000 rows:    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   
//------------------------------------------------------------------------------------------------------------------------
   UnsafeSorterSpillReader                          932683         **943898**   
      NaN          0.0     3643292.4       1.0X
   
   These are micro-benchmark results that simulate 100000 joins when spilling 
is enabled and there IS patch applied. The micro-benchmark will create three 
spill files and for each processed join row it will create “lazy” iterator on 
top of data in-memory and data that is spilled on the disk
   
   OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux 
4.4.0-178-generic
   Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
   Spilling  SpillReader with 16000 rows:    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   
//------------------------------------------------------------------------------------------------------------------------
   UnsafeSorterSpillReader_bufferSize1024              411            **426**   
       13          0.6        1607.2       1.0X
   
   Comparison of these two micro-benchmarks shows more than **2000 X**  
(943898/426 = 2215.72 X ) difference in performance. The more spilling files 
will cause slower performance due to fact that the current code reads number of 
rows from each spilled fragment and it is producing I/0 (which is in 
milliseconds). This is done for each join row and if there are millions of join 
rows, the processing will be very slow as above results shows. Hence there is 
need for this PR.
   
   ### Does this PR introduce any user-facing change?
   
   No
   
   ### How was this patch tested?
   
   By running TPC-DS SQLs with different data sets 10 TB and 100 TB
   By running all Spark tests.
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
   -->
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to