pan3793 opened a new pull request #35076:
URL: https://github.com/apache/spark/pull/35076
### What changes were proposed in this pull request?
When enable push-based shuffle, there is a chance that task hang at
```
59 Executor task launch worker for task 424.0 in stage 753.0 (TID 106778)
WAITING Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1660371198})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:756)
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter_0$(Unknown
Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown
Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Unknown
Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_1$(Unknown
Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_0$(Unknown
Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithoutKey_0$(Unknown
Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown
Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:136)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$518/852390142.apply(Unknown
Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
```
And
`org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:756)`
is
```
while (result == null) {
...
result = results.take() // line 756
...
}
```
After some investigations, found that the last `FetchResult` put into
`result` is `PushMergedLocalMetaFetchResult`, and there is a chance that `bufs`
is empty, will cause no `SuccessFetchResult` be added to `results`, and thread
hang if no other `FetchResult` is put into `results`.
```scala
while (result == null) {
...
result = results.take()
...
result match {
case r @ SuccessFetchResult(blockId, mapIndex, address, size, buf,
isNetworkReqDone) =>
...
case PushMergedLocalMetaFetchResult(
shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) =>
val shuffleBlockId = ShuffleMergedBlockId(shuffleId, shuffleMergeId,
reduceId)
try {
val bufs: Seq[ManagedBuffer] =
blockManager.getLocalMergedBlockData(shuffleBlockId,
localDirs)
// THERE IS A CHANCE THAT bufs.isEmpty!
...
bufs.zipWithIndex.foreach { case (buf, chunkId) =>
buf.retain()
val shuffleChunkId = ShuffleBlockChunkId(shuffleId,
shuffleMergeId, reduceId,
chunkId)
pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
results.put(SuccessFetchResult(shuffleChunkId,
SHUFFLE_PUSH_MAP_ID,
pushBasedFetchHelper.localShuffleMergerBlockMgrId, buf.size(),
buf,
isNetworkReqDone = false))
}
} catch {
case e: Exception =>
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
shuffleBlockId,
pushBasedFetchHelper.localShuffleMergerBlockMgrId)
}
result = null
...
}
}
```
### Why are the changes needed?
Fallback to fetch original blocks when noLocalMergedBlockDataError to avoid
task hang.
### Does this PR introduce _any_ user-facing change?
Bug fix, to make push-based shuffle more stable.
### How was this patch tested?
Pass 1T TPC-DS tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]