Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/10506#discussion_r121609933
--- Diff:
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---
@@ -134,6 +138,79 @@ class MasterSuite extends SparkFunSuite
CustomRecoveryModeFactory.instantiationAttempts should be >
instantiationAttempts
}
+ test("master correctly recover the application") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set("spark.deploy.recoveryMode", "CUSTOM")
+ conf.set("spark.deploy.recoveryMode.factory",
+ classOf[FakeRecoveryModeFactory].getCanonicalName)
+ conf.set("spark.master.rest.enabled", "false")
+
+ val fakeAppInfo = makeAppInfo(1024)
+ val fakeWorkerInfo = makeWorkerInfo(8192, 16)
+ val fakeDriverInfo = new DriverInfo(
+ startTime = 0,
+ id = "test_driver",
+ desc = new DriverDescription(
+ jarUrl = "",
+ mem = 1024,
+ cores = 1,
+ supervise = false,
+ command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
+ submitDate = new Date())
+
+ // Build the fake recovery data
+ FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}",
fakeAppInfo)
+
FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}",
fakeDriverInfo)
+
FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}",
fakeWorkerInfo)
+
+ var master: Master = null
+ try {
+ master = makeMaster(conf)
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ // Wait until Master recover from checkpoint data.
+ eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ master.idToApp.size should be(1)
+ }
+
+ master.idToApp.keySet should be(Set(fakeAppInfo.id))
+ getDrivers(master) should be(Set(fakeDriverInfo))
+ master.workers should be(Set(fakeWorkerInfo))
+
+ // Notify Master about the executor and driver info to make it
correctly recovered.
+ val fakeExecutors = List(
+ new ExecutorDescription(fakeAppInfo.id, 0, 8,
ExecutorState.RUNNING),
+ new ExecutorDescription(fakeAppInfo.id, 0, 7,
ExecutorState.RUNNING))
+
+ fakeAppInfo.state should be(ApplicationState.UNKNOWN)
+
+ master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
+ master.self.send(
+ WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors,
Seq(fakeDriverInfo.id)))
+
+ eventually(timeout(1 second), interval(10 milliseconds)) {
--- End diff --
hmmm will this be flaky?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]