cxzl25 opened a new pull request, #38467:
URL: https://github.com/apache/spark/pull/38467
### What changes were proposed in this pull request?
Avoid creating a directory when deleting a block.
### Why are the changes needed?
When the driver submits a job, DAGScheduler calls
sc.broadcast(taskBinaryBytes).
TorrentBroadcast#writeBlocks may fail due to disk problems during
blockManager#putBytes.
BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the
block.
BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks
on disk.
DiskStore#remove will try to create the directory because the directory does
not exist, and an exception will be thrown at this time.
BlockInfoManager#blockInfoWrappers block info and lock not removed.
The catch block in TorrentBroadcast#writeBlocks will call
blockManager.removeBroadcast to clean up the broadcast.
Because the block lock in BlockInfoManager#blockInfoWrappers is not
released, the dag-scheduler-event-loop thread of DAGScheduler will wait forever.
```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed
due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail,
remove all pieces of the broadcast
```
```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31
tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007add3d8c8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
at
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
at
org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown
Source)
at
org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
at
org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
at
org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
at
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
at
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
at
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
at
org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown
Source)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
at
org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
at
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
at
org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Throw an exception before `Files.createDirectory` to simulate disk problems.
DiskBlockManager#getFile
```java
if (filename.contains("piece")) {
throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]