cloud-fan commented on a change in pull request #24533: [SPARK-27637][Shuffle]
For nettyBlockTransferService, if IOException occurred while fetching data,
check whether relative executor is alive before retry
URL: https://github.com/apache/spark/pull/24533#discussion_r283723768
##########
File path:
core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
##########
@@ -77,6 +87,68 @@ class NettyBlockTransferServiceSuite
verifyServicePort(expectedPort = service0.port + 1, actualPort =
service1.port)
}
+ test("SPARK-27637: test fetch block with executor dead") {
+ implicit val exectionContext = ExecutionContext.global
+ val port = 17634 + Random.nextInt(10000)
+ logInfo("random port for test: " + port)
+
+ val driverEndpointRef = new RpcEndpointRef(new SparkConf()) {
+ override def address: RpcAddress = null
+ override def name: String = "test"
+ override def send(message: Any): Unit = {}
+ // This rpcEndPointRef always return false for unit test to touch
ExecutorDeadException.
+ override def ask[T: ClassTag](message: Any, timeout: RpcTimeout):
Future[T] = {
+ Future{false.asInstanceOf[T]}
+ }
+ }
+
+ val clientFactory = mock(classOf[TransportClientFactory])
+ val client = mock(classOf[TransportClient])
+ // This is used to touch an IOException during fetching block.
+ when(client.sendRpc(any(), any())).thenAnswer(_ => {throw new
IOException()})
+ when(clientFactory.createClient(any(), any())).thenReturn(client)
+
+ val listener = mock(classOf[BlockFetchingListener])
+ // This is used to throw a ExecutorDeadException for unit test, when the
fetch is failed
+ // because of ExecutorDeadException.
+ when(listener.onBlockFetchFailure(any(),
any(classOf[ExecutorDeadException])))
+ .thenAnswer(_ => {throw new ExecutorDeadException("Executor is dead.")})
+
+ service0 = createService(port, driverEndpointRef)
+ val clientFactoryField = service0.getClass.getField(
+
"org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory")
+ clientFactoryField.setAccessible(true)
+ clientFactoryField.set(service0, clientFactory)
+
+ // The ExecutorDeadException is thrown by listener when fetch failed for
ExecutorDeadException.
+ intercept[ExecutorDeadException](service0.fetchBlocks("localhost", port,
"exec1",
+ Array("block1"), listener, mock(classOf[DownloadFileManager])))
+
+ val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
+ override def createAndStart(blockIds: Array[String], listener:
BlockFetchingListener) {
+ try {
+ new OneForOneBlockFetcher(client, "appId", "execId", blockIds,
listener,
+ mock(classOf[TransportConf]),
mock(classOf[DownloadFileManager])).start()
+ } catch {
+ case e: IOException =>
+ if (driverEndpointRef.askSync[Boolean](IsExecutorAlive("execId")))
{
Review comment:
This is the code we want to test, we should not put it in the test...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]