cloud-fan commented on a change in pull request #24533: [SPARK-27637][Shuffle] 
For nettyBlockTransferService, if IOException occurred while fetching data, 
check whether relative executor is alive  before retry
URL: https://github.com/apache/spark/pull/24533#discussion_r283723999
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
 ##########
 @@ -77,6 +87,68 @@ class NettyBlockTransferServiceSuite
     verifyServicePort(expectedPort = service0.port + 1, actualPort = 
service1.port)
   }
 
+  test("SPARK-27637: test fetch block with executor dead") {
+    implicit val exectionContext = ExecutionContext.global
+    val port = 17634 + Random.nextInt(10000)
+    logInfo("random port for test: " + port)
+
+    val driverEndpointRef = new RpcEndpointRef(new SparkConf()) {
+      override def address: RpcAddress = null
+      override def name: String = "test"
+      override def send(message: Any): Unit = {}
+      // This rpcEndPointRef always return false for unit test to touch 
ExecutorDeadException.
+      override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): 
Future[T] = {
+        Future{false.asInstanceOf[T]}
+      }
+    }
+
+    val clientFactory = mock(classOf[TransportClientFactory])
+    val client = mock(classOf[TransportClient])
+    // This is used to touch an IOException during fetching block.
+    when(client.sendRpc(any(), any())).thenAnswer(_ => {throw new 
IOException()})
+    when(clientFactory.createClient(any(), any())).thenReturn(client)
+
+    val listener = mock(classOf[BlockFetchingListener])
+    // This is used to throw a ExecutorDeadException for unit test, when the 
fetch is failed
+    // because of ExecutorDeadException.
+    when(listener.onBlockFetchFailure(any(), 
any(classOf[ExecutorDeadException])))
+      .thenAnswer(_ => {throw new ExecutorDeadException("Executor is dead.")})
+
+    service0 = createService(port, driverEndpointRef)
+    val clientFactoryField = service0.getClass.getField(
+      
"org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory")
+    clientFactoryField.setAccessible(true)
+    clientFactoryField.set(service0, clientFactory)
+
+    // The ExecutorDeadException is thrown by listener when fetch failed for 
ExecutorDeadException.
+    intercept[ExecutorDeadException](service0.fetchBlocks("localhost", port, 
"exec1",
+      Array("block1"), listener, mock(classOf[DownloadFileManager])))
+
+    val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
+      override def createAndStart(blockIds: Array[String], listener: 
BlockFetchingListener) {
+        try {
+          new OneForOneBlockFetcher(client, "appId", "execId", blockIds, 
listener,
+            mock(classOf[TransportConf]), 
mock(classOf[DownloadFileManager])).start()
+        } catch {
+          case e: IOException =>
+            if (driverEndpointRef.askSync[Boolean](IsExecutorAlive("execId"))) 
{
 
 Review comment:
   one principal to write a regression test: it should fail without the patch

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to