mridulm commented on a change in pull request #29558:
URL: https://github.com/apache/spark/pull/29558#discussion_r486436039
##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -130,25 +130,33 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj:
T, id: Long)
// Store a copy of the broadcast variable in the driver so that tasks run
on the driver
// do not create a duplicate copy of the broadcast variable's value.
val blockManager = SparkEnv.get.blockManager
- if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK,
tellMaster = false)) {
- throw new SparkException(s"Failed to store $broadcastId in BlockManager")
- }
- val blocks =
- TorrentBroadcast.blockifyObject(value, blockSize,
SparkEnv.get.serializer, compressionCodec)
- if (checksumEnabled) {
- checksums = new Array[Int](blocks.length)
- }
- blocks.zipWithIndex.foreach { case (block, i) =>
+ try {
+ if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK,
tellMaster = false)) {
+ throw new SparkException(s"Failed to store $broadcastId in
BlockManager")
+ }
+ val blocks =
+ TorrentBroadcast.blockifyObject(value, blockSize,
SparkEnv.get.serializer, compressionCodec)
if (checksumEnabled) {
- checksums(i) = calcChecksum(block)
+ checksums = new Array[Int](blocks.length)
}
- val pieceId = BroadcastBlockId(id, "piece" + i)
- val bytes = new ChunkedByteBuffer(block.duplicate())
- if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER,
tellMaster = true)) {
- throw new SparkException(s"Failed to store $pieceId of $broadcastId in
local BlockManager")
+ blocks.zipWithIndex.foreach { case (block, i) =>
+ if (checksumEnabled) {
+ checksums(i) = calcChecksum(block)
+ }
+ val pieceId = BroadcastBlockId(id, "piece" + i)
+ val bytes = new ChunkedByteBuffer(block.duplicate())
+ if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER,
tellMaster = true)) {
+ throw new SparkException(s"Failed to store $pieceId of $broadcastId
" +
+ s"in local BlockManager")
+ }
}
+ blocks.length
+ } catch {
+ case t: Throwable =>
+ logError(s"Store broadcast $broadcastId fail, remove all pieces of the
broadcast")
+ blockManager.removeBroadcast(id, tellMaster = true)
Review comment:
I agree with @dongjoon-hyun - `blockManager.putSingle` for broadcastid
should be outside try/catch. Rest look fine.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]