LantaoJin commented on a change in pull request #29558:
URL: https://github.com/apache/spark/pull/29558#discussion_r480673106



##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -130,25 +130,33 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
     // Store a copy of the broadcast variable in the driver so that tasks run 
on the driver
     // do not create a duplicate copy of the broadcast variable's value.
     val blockManager = SparkEnv.get.blockManager
-    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
-      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
-    }
-    val blocks =
-      TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
-    if (checksumEnabled) {
-      checksums = new Array[Int](blocks.length)
-    }
-    blocks.zipWithIndex.foreach { case (block, i) =>
+    try {
+      if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, 
tellMaster = false)) {
+        throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+      }
+      val blocks =
+        TorrentBroadcast.blockifyObject(value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
       if (checksumEnabled) {
-        checksums(i) = calcChecksum(block)
+        checksums = new Array[Int](blocks.length)
       }
-      val pieceId = BroadcastBlockId(id, "piece" + i)
-      val bytes = new ChunkedByteBuffer(block.duplicate())
-      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-        throw new SparkException(s"Failed to store $pieceId of $broadcastId in 
local BlockManager")
+      blocks.zipWithIndex.foreach { case (block, i) =>
+        if (checksumEnabled) {
+          checksums(i) = calcChecksum(block)
+        }
+        val pieceId = BroadcastBlockId(id, "piece" + i)
+        val bytes = new ChunkedByteBuffer(block.duplicate())
+        if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
+          throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
+            s"in local BlockManager")
+        }
       }
+      blocks.length
+    } catch {
+      case t: Throwable =>
+        logError(s"Store broadcast $broadcastId fail, remove all pieces of the 
broadcast")
+        blockManager.removeBroadcast(id, tellMaster = true)

Review comment:
       We can begin to Try..Catch from L137 if the L134 is atomic. But I think 
`L157: blockManager.removeBroadcast(id, tellMaster = true)` is no harmful even 
exception throws from L135. Please correct me if I understand incorrectly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to