cxzl25 opened a new pull request, #38467: URL: https://github.com/apache/spark/pull/38467
### What changes were proposed in this pull request? Avoid creating a directory when deleting a block. ### Why are the changes needed? When the driver submits a job, DAGScheduler calls sc.broadcast(taskBinaryBytes). TorrentBroadcast#writeBlocks may fail due to disk problems during blockManager#putBytes. BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the block. BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks on disk. DiskStore#remove will try to create the directory because the directory does not exist, and an exception will be thrown at this time. BlockInfoManager#blockInfoWrappers block info and lock not removed. The catch block in TorrentBroadcast#writeBlocks will call blockManager.removeBroadcast to clean up the broadcast. Because the block lock in BlockInfoManager#blockInfoWrappers is not released, the dag-scheduler-event-loop thread of DAGScheduler will wait forever. ``` 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX. 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast ``` ``` "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221) at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214) at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source) at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105) at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214) at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979) at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970) at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970) at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Throw an exception before `Files.createDirectory` to simulate disk problems. DiskBlockManager#getFile ```java if (filename.contains("piece")) { throw new java.io.IOException("disk issue") } Files.createDirectory(path) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org