Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10506#discussion_r121610617
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---
    @@ -134,6 +138,79 @@ class MasterSuite extends SparkFunSuite
         CustomRecoveryModeFactory.instantiationAttempts should be > 
instantiationAttempts
       }
     
    +  test("master correctly recover the application") {
    +    val conf = new SparkConf(loadDefaults = false)
    +    conf.set("spark.deploy.recoveryMode", "CUSTOM")
    +    conf.set("spark.deploy.recoveryMode.factory",
    +      classOf[FakeRecoveryModeFactory].getCanonicalName)
    +    conf.set("spark.master.rest.enabled", "false")
    +
    +    val fakeAppInfo = makeAppInfo(1024)
    +    val fakeWorkerInfo = makeWorkerInfo(8192, 16)
    +    val fakeDriverInfo = new DriverInfo(
    +      startTime = 0,
    +      id = "test_driver",
    +      desc = new DriverDescription(
    +        jarUrl = "",
    +        mem = 1024,
    +        cores = 1,
    +        supervise = false,
    +        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
    +      submitDate = new Date())
    +
    +    // Build the fake recovery data
    +    FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", 
fakeAppInfo)
    +    
FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", 
fakeDriverInfo)
    +    
FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", 
fakeWorkerInfo)
    +
    +    var master: Master = null
    +    try {
    +      master = makeMaster(conf)
    +      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
    +      // Wait until Master recover from checkpoint data.
    +      eventually(timeout(5 seconds), interval(100 milliseconds)) {
    +        master.idToApp.size should be(1)
    +      }
    +
    +      master.idToApp.keySet should be(Set(fakeAppInfo.id))
    +      getDrivers(master) should be(Set(fakeDriverInfo))
    +      master.workers should be(Set(fakeWorkerInfo))
    +
    +      // Notify Master about the executor and driver info to make it 
correctly recovered.
    +      val fakeExecutors = List(
    +        new ExecutorDescription(fakeAppInfo.id, 0, 8, 
ExecutorState.RUNNING),
    +        new ExecutorDescription(fakeAppInfo.id, 0, 7, 
ExecutorState.RUNNING))
    +
    +      fakeAppInfo.state should be(ApplicationState.UNKNOWN)
    +
    +      master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
    +      master.self.send(
    +        WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, 
Seq(fakeDriverInfo.id)))
    +
    +      eventually(timeout(1 second), interval(10 milliseconds)) {
    --- End diff --
    
    Because RPC `send` is asynchronous, if we check the app state immediately 
after `send` we will get "UNKNOWN" state instead of "WAITING".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to