cloud-fan commented on a change in pull request #28924:
URL: https://github.com/apache/spark/pull/28924#discussion_r452164478
##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -177,6 +181,95 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
blockManager.stop()
}
+ private def setupBlockManagerMasterWithBlocks(withLost: Boolean): Unit = {
+ // set up a simple DriverEndpoint which simply adds executorIds and
+ // checks whether a certain executorId has been added before.
+ val driverEndpoint =
rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
+ new RpcEndpoint {
+ private val executorSet = mutable.HashSet[String]()
+ override val rpcEnv: RpcEnv = this.rpcEnv
+ override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
+ case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _,
_, _, _, _, _) =>
+ executorSet += executorId
+ context.reply(true)
+ case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
+ context.reply(executorSet.contains(executorId))
+ }
+ }
+ )
+
+ def createAndRegisterBlockManager(timeout: Boolean): BlockManagerId = {
+ val id = if (timeout) "timeout" else "normal"
+ val bmRef = rpcEnv.setupEndpoint(s"bm-$id", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = this.rpcEnv
+ private def reply[T](context: RpcCallContext, response: T): Unit = {
+ if (timeout) {
+ Thread.sleep(conf.getTimeAsMs(Network.RPC_ASK_TIMEOUT.key) + 1000)
+ }
+ context.reply(response)
+ }
+
+ override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
+ case RemoveRdd(_) => reply(context, 1)
+ case RemoveBroadcast(_, _) => reply(context, 1)
+ case RemoveShuffle(_) => reply(context, true)
+ }
+ })
+ val bmId = BlockManagerId(s"exec-$id", "localhost", 1234, None)
+ master.registerBlockManager(bmId, Array.empty, 2000, 0, bmRef)
+ }
+
+ // set up normal bm1
+ val bm1Id = createAndRegisterBlockManager(false)
+ // set up bm2, which intentionally takes more time than RPC_ASK_TIMEOUT to
+ // remove rdd/broadcast/shuffle in order to raise timeout error
+ val bm2Id = createAndRegisterBlockManager(true)
+
+
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor(
+ bm1Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty,
+ Map.empty, 0))
+
+ if (!withLost) {
Review comment:
what does `withLost` means?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]