cxzl25 opened a new pull request, #38467:
URL: https://github.com/apache/spark/pull/38467

   ### What changes were proposed in this pull request?
   Avoid creating a directory when deleting a block.
   
   ### Why are the changes needed?
   When the driver submits a job, DAGScheduler calls 
sc.broadcast(taskBinaryBytes).
   TorrentBroadcast#writeBlocks may fail due to disk problems during 
blockManager#putBytes.
   BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the 
block.
   BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks 
on disk.
   DiskStore#remove will try to create the directory because the directory does 
not exist, and an exception will be thrown at this time.
   BlockInfoManager#blockInfoWrappers block info and lock not removed.
   The catch block in TorrentBroadcast#writeBlocks will call 
blockManager.removeBroadcast to clean up the broadcast.
   Because the block lock in BlockInfoManager#blockInfoWrappers is not 
released, the dag-scheduler-event-loop thread of DAGScheduler will wait forever.
   
   ```
   22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed 
due to exception java.io.IOException: XXXXX.
   22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, 
remove all pieces of the broadcast 
   ```
   
   ```
   "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 
tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
      java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00000007add3d8c8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
       at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
       at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
       at 
org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown 
Source)
       at 
org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
       at 
org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
       at 
org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
       at 
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
       at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
       at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
       at 
org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown 
Source)
       at scala.collection.Iterator.foreach(Iterator.scala:943)
       at scala.collection.Iterator.foreach$(Iterator.scala:943)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
       at 
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
       at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
       at 
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
       at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
       at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
       at 
org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
       at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
       at 
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
       at 
org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
       at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 
   ```
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Throw an exception before `Files.createDirectory` to simulate disk problems.
   
   DiskBlockManager#getFile
   ```java
   if (filename.contains("piece")) {
     throw new java.io.IOException("disk issue")
   }
   Files.createDirectory(path)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to