siknezevic commented on issue #27246: [SPARK-30536][CORE][SQL] Sort-merge join operator spilling performance improvements URL: https://github.com/apache/spark/pull/27246#issuecomment-580084167 I did test to see what is the impact on the spilling performance without changes in ExternalAppendOnlyUnsafeRowArray class.The design of MergerSorterIteretor.hasNext() in ExternalAppendOnlyUnsafeRowArray is critical for the good performance. The below is the stack of the code execution which has lazy initialization of the spill reader but no changes in ExternalAppendOnlyUnsafeRowArray. As you can see invocation of SpillableArrayIterator.hasNext() will cause initialization of UnsafeSorterSpillReader and it will produce I/0. The invocation of MergerSorterIteretor.hasNext() will not cause initialization of UnsafeSorterSpillReader, so I/0 will not happen. The startIndex that is passed to MergerSorterIteretor is 0. startIndex initialize currentIndex, so we have currentIndex < numRowBufferedInMemor (0 < numRowBufferedInMemor ) .This means that MergerSorterIteretor.hasNext() will not invoke iterator.hasNext() method which is the case for SpillableArrayIterator.hasNext(). Invocation of iterator.hasNext() is producing I/O. Could you please let me know if I should still submit PR for lazy spill reader initialization? This PR without changes in ExternalAppendOnlyUnsafeRowArray class would not improve spilling performance. It would make sense only as “transition” PR that would be followed by PR for ExternalAppendOnlyUnsafeRowArray changes. sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:235) org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:276) org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:255) java.io.DataInputStream.readInt(DataInputStream.java:387) org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.readFile(UnsafeSorterSpillReader.java:175) org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.getNumRecords(UnsafeSorterSpillReader.java:72) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator.initializeNumRecords(UnsafeExternalSorter.java:733) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator.hasNext(UnsafeExternalSorter.java:700) org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray$SpillableArrayIterator.hasNext(ExternalAppendOnlyUnsafeRowArray.scala:214) org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:278) org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage12.agg_doAggregateWithKeys_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage12.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) org.apache.spark.scheduler.Task.run(Task.scala:123) org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
