bozhang2820 commented on a change in pull request #35960:
URL: https://github.com/apache/spark/pull/35960#discussion_r835077079



##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -148,16 +149,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
         val pieceId = BroadcastBlockId(id, "piece" + i)
         val bytes = new ChunkedByteBuffer(block.duplicate())
         if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
tellMaster = true)) {
-          throw new SparkException(s"Failed to store $pieceId of $broadcastId 
" +
-            s"in local BlockManager")
+          throw SparkCoreErrors.storeBlockError(pieceId)
         }
       }
       blocks.length
     } catch {
       case t: Throwable =>
         logError(s"Store broadcast $broadcastId fail, remove all pieces of the 
broadcast")
         blockManager.removeBroadcast(id, tellMaster = true)
-        throw t
+        throw SparkCoreErrors.storeBlockError(broadcastId, t)

Review comment:
       Will change back to throw the caught Throwable here.

##########
File path: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
##########
@@ -304,6 +308,76 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
       assert(results.collect().toSet === (1 to partitions).map(x => (x, 
list.sum)).toSet)
     }
   }
+
+  test("Error in BlockManager.putSingle") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(false)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+    assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to store block: "))
+  }
+
+  test("Error in BlockManager.putBytes") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(false)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+    assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to store block: "))
+  }
+
+  test("Error when getting local blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    val mockBlockResult = mock(classOf[BlockResult])
+    when(mockBlockResult.data).thenReturn(Iterator.empty)
+    
when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(Some(mockBlockResult))
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "GET_LOCAL_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to get local block: "))
+  }
+
+  test("Error when getting remote blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(None)
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "GET_BLOCK_ERROR")
+    assert(thrown.getMessage.startsWith("Failed to get block: "))
+  }
+
+  test("Corrupted remote blocks") {
+    val mockBlockManager = mock(classOf[BlockManager])
+    when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+    when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+    when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(
+      Some(new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))))
+    sc = sparkContextWithBlockManager(mockBlockManager)
+    val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+    assert(thrown.getErrorClass == "CORRUPTED_REMOTE_BLOCK")
+    assert(thrown.getMessage.startsWith("Corrupted remote block: "))
+  }
+
+  private def sparkContextWithBlockManager(blockManager: BlockManager): 
SparkContext = {
+    new SparkContext(new SparkConf().setAppName("test").setMaster("local")) {
+      override private[spark] def createSparkEnv(
+                                                  conf: SparkConf,
+                                                  isLocal: Boolean,
+                                                  listenerBus: 
LiveListenerBus) = {

Review comment:
       Will do.

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -325,4 +325,24 @@ object SparkCoreErrors {
     new SparkException(errorClass = "GRAPHITE_SINK_PROPERTY_MISSING",
       messageParameters = Array(missingProperty), cause = null)
   }
+
+  def storeBlockError(blockId: BlockId, cause: Throwable = null): Throwable = {
+    new SparkException(errorClass = "STORE_BLOCK_ERROR",
+      messageParameters = Array(blockId.toString), cause = cause)
+  }
+

Review comment:
       Not sure what you mean by "replacing a single invocation site". Could 
you elaborate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to