cloud-fan commented on a change in pull request #24533: [SPARK-27637][Shuffle]
For nettyBlockTransferService, if IOException occurred while fetching data,
check whether relative executor is alive before retry
URL: https://github.com/apache/spark/pull/24533#discussion_r285106531
##########
File path:
core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
##########
@@ -77,6 +85,51 @@ class NettyBlockTransferServiceSuite
verifyServicePort(expectedPort = service0.port + 1, actualPort =
service1.port)
}
+ test("SPARK-27637: test fetch block with executor dead") {
+ implicit val exectionContext = ExecutionContext.global
+ val port = 17634 + Random.nextInt(10000)
+ logInfo("random port for test: " + port)
+
+ val driverEndpointRef = new RpcEndpointRef(new SparkConf()) {
+ override def address: RpcAddress = null
+ override def name: String = "test"
+ override def send(message: Any): Unit = {}
+ // This rpcEndPointRef always return false for unit test to touch
ExecutorDeadException.
+ override def ask[T: ClassTag](message: Any, timeout: RpcTimeout):
Future[T] = {
+ Future{false.asInstanceOf[T]}
+ }
+ }
+
+ val clientFactory = mock(classOf[TransportClientFactory])
+ val client = mock(classOf[TransportClient])
+ // This is used to touch an IOException during fetching block.
+ when(client.sendRpc(any(), any())).thenAnswer(_ => {throw new
IOException()})
+ var createClientCount = 0
+ when(clientFactory.createClient(any(), any())).thenAnswer(_ => {
+ createClientCount += 1
+ client
+ })
+
+ val listener = mock(classOf[BlockFetchingListener])
+ // This is used to throw a ExecutorDeadException for unit test, when the
fetch is failed
+ // because of ExecutorDeadException.
+ when(listener.onBlockFetchFailure(any(),
any(classOf[ExecutorDeadException])))
+ .thenAnswer(_ => {throw new ExecutorDeadException("Executor is dead.")})
Review comment:
nit:
```
var hitExecutorDeadException = false
when(listener.onBlockFetchFailure(any(),
any(classOf[ExecutorDeadException])))
.thenAnswer(_ => {hitExecutorDeadException = true})
...
service0.fetchBlocks...
assert(createClientCount === 1)
assert(hitExecutorDeadException)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]