mridulm commented on a change in pull request #35960:
URL: https://github.com/apache/spark/pull/35960#discussion_r834947208
##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -148,16 +149,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj:
T, id: Long)
val pieceId = BroadcastBlockId(id, "piece" + i)
val bytes = new ChunkedByteBuffer(block.duplicate())
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER,
tellMaster = true)) {
- throw new SparkException(s"Failed to store $pieceId of $broadcastId
" +
- s"in local BlockManager")
+ throw SparkCoreErrors.storeBlockError(pieceId)
}
}
blocks.length
} catch {
case t: Throwable =>
logError(s"Store broadcast $broadcastId fail, remove all pieces of the
broadcast")
blockManager.removeBroadcast(id, tellMaster = true)
- throw t
+ throw SparkCoreErrors.storeBlockError(broadcastId, t)
Review comment:
Two issues with this:
a) We are changing the exception being thrown (note: we are catching
`Throwable`).
b) Message for exception thrown in L151 is getting changed in L160.
##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -325,4 +325,24 @@ object SparkCoreErrors {
new SparkException(errorClass = "GRAPHITE_SINK_PROPERTY_MISSING",
messageParameters = Array(missingProperty), cause = null)
}
+
+ def storeBlockError(blockId: BlockId, cause: Throwable = null): Throwable = {
+ new SparkException(errorClass = "STORE_BLOCK_ERROR",
+ messageParameters = Array(blockId.toString), cause = cause)
+ }
+
Review comment:
Let us remove `getBlockError`, `corruptedRemoteBlockError` and
`getLocalBlockError` - they are replacing a single invocation site.
##########
File path: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
##########
@@ -304,6 +308,76 @@ class BroadcastSuite extends SparkFunSuite with
LocalSparkContext with Encryptio
assert(results.collect().toSet === (1 to partitions).map(x => (x,
list.sum)).toSet)
}
}
+
+ test("Error in BlockManager.putSingle") {
+ val mockBlockManager = mock(classOf[BlockManager])
+ when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(false)
+ sc = sparkContextWithBlockManager(mockBlockManager)
+ val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+ assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+ assert(thrown.getMessage.startsWith("Failed to store block: "))
+ }
+
+ test("Error in BlockManager.putBytes") {
+ val mockBlockManager = mock(classOf[BlockManager])
+ when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(false)
+ sc = sparkContextWithBlockManager(mockBlockManager)
+ val thrown = intercept[SparkException] { sc.broadcast(Array(1)) }
+ assert(thrown.getErrorClass == "STORE_BLOCK_ERROR")
+ assert(thrown.getMessage.startsWith("Failed to store block: "))
+ }
+
+ test("Error when getting local blocks") {
+ val mockBlockManager = mock(classOf[BlockManager])
+ when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+ when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+ val mockBlockResult = mock(classOf[BlockResult])
+ when(mockBlockResult.data).thenReturn(Iterator.empty)
+
when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(Some(mockBlockResult))
+ sc = sparkContextWithBlockManager(mockBlockManager)
+ val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+ assert(thrown.getErrorClass == "GET_LOCAL_BLOCK_ERROR")
+ assert(thrown.getMessage.startsWith("Failed to get local block: "))
+ }
+
+ test("Error when getting remote blocks") {
+ val mockBlockManager = mock(classOf[BlockManager])
+ when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+ when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+ when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+ when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+ when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(None)
+ sc = sparkContextWithBlockManager(mockBlockManager)
+ val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+ assert(thrown.getErrorClass == "GET_BLOCK_ERROR")
+ assert(thrown.getMessage.startsWith("Failed to get block: "))
+ }
+
+ test("Corrupted remote blocks") {
+ val mockBlockManager = mock(classOf[BlockManager])
+ when(mockBlockManager.putSingle(any, any, any, any)(any)).thenReturn(true)
+ when(mockBlockManager.putBytes(any, any, any, any)(any)).thenReturn(true)
+ when(mockBlockManager.getLocalValues(any[BlockId])).thenReturn(None)
+ when(mockBlockManager.getLocalBytes(any[BlockId])).thenReturn(None)
+ when(mockBlockManager.getRemoteBytes(any[BlockId])).thenReturn(
+ Some(new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))))
+ sc = sparkContextWithBlockManager(mockBlockManager)
+ val thrown = intercept[SparkException] { sc.broadcast(Array(1)).value }
+ assert(thrown.getErrorClass == "CORRUPTED_REMOTE_BLOCK")
+ assert(thrown.getMessage.startsWith("Corrupted remote block: "))
+ }
+
+ private def sparkContextWithBlockManager(blockManager: BlockManager):
SparkContext = {
+ new SparkContext(new SparkConf().setAppName("test").setMaster("local")) {
+ override private[spark] def createSparkEnv(
+ conf: SparkConf,
+ isLocal: Boolean,
+ listenerBus:
LiveListenerBus) = {
Review comment:
Fix indentation here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]