siknezevic commented on issue #27246: [SPARK-30536][CORE][SQL] Sort-merge join 
operator spilling performance improvements
URL: https://github.com/apache/spark/pull/27246#issuecomment-580084167
 
 
   I did test to see what is the impact on the spilling performance without 
changes in ExternalAppendOnlyUnsafeRowArray class.The design of 
MergerSorterIteretor.hasNext()  in ExternalAppendOnlyUnsafeRowArray is critical 
for the good performance. 
   
   The below is the stack of the code execution which has lazy initialization 
of the spill reader but no changes in ExternalAppendOnlyUnsafeRowArray. As you 
can see invocation of SpillableArrayIterator.hasNext() will cause 
initialization of UnsafeSorterSpillReader and it will produce I/0.
   
   The invocation of MergerSorterIteretor.hasNext()  will not cause 
initialization of UnsafeSorterSpillReader, so I/0 will not happen. The 
startIndex that is passed to MergerSorterIteretor is 0. startIndex initialize 
currentIndex, so we have currentIndex < numRowBufferedInMemor (0 < 
numRowBufferedInMemor ) .This means that MergerSorterIteretor.hasNext()   will 
not invoke iterator.hasNext() method which is the case for 
SpillableArrayIterator.hasNext(). Invocation of iterator.hasNext() is producing 
I/O.
   
   Could you please let me know if I should still submit PR for lazy spill 
reader initialization?
   This PR without changes in ExternalAppendOnlyUnsafeRowArray class would not 
improve spilling performance. It would make sense only as “transition” PR that 
would be followed by PR for ExternalAppendOnlyUnsafeRowArray changes.
   
   
   sun.misc.Unsafe.park(Native Method)
   java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
   
org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:235)
   org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:276)
   org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:255)
   java.io.DataInputStream.readInt(DataInputStream.java:387)
   
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.readFile(UnsafeSorterSpillReader.java:175)
   
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.getNumRecords(UnsafeSorterSpillReader.java:72)
   
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator.initializeNumRecords(UnsafeExternalSorter.java:733)
   
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator.hasNext(UnsafeExternalSorter.java:700)
   
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray$SpillableArrayIterator.hasNext(ExternalAppendOnlyUnsafeRowArray.scala:214)
   
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:278)
   
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
   
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage12.agg_doAggregateWithKeys_0$(Unknown
 Source)
   
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage12.processNext(Unknown
 Source)
   
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
   scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
   
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
   org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
   org.apache.spark.scheduler.Task.run(Task.scala:123)
   
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to