Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10506#discussion_r121576458
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---
    @@ -134,6 +138,71 @@ class MasterSuite extends SparkFunSuite
         CustomRecoveryModeFactory.instantiationAttempts should be > 
instantiationAttempts
       }
     
    +  test("master correctly recover the application") {
    +    val conf = new SparkConf(loadDefaults = false)
    +    conf.set("spark.deploy.recoveryMode", "CUSTOM")
    +    conf.set("spark.deploy.recoveryMode.factory",
    +      classOf[FakeRecoveryModeFactory].getCanonicalName)
    +    conf.set("spark.master.rest.enabled", "false")
    +
    +    val fakeAppInfo = makeAppInfo(1024)
    +    val fakeWorkerInfo = makeWorkerInfo(8192, 16)
    +    val fakeDriverInfo = new DriverInfo(
    +      startTime = 0,
    +      id = "test_driver",
    +      desc = new DriverDescription(
    +        jarUrl = "",
    +        mem = 1024,
    +        cores = 1,
    +        supervise = false,
    +        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
    +      submitDate = new Date())
    +
    +    // Build the fake recovery data
    +    FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", 
fakeAppInfo)
    +    
FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", 
fakeDriverInfo)
    +    
FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", 
fakeWorkerInfo)
    +
    +    var master: Master = null
    +    try {
    +      master = makeMaster(conf)
    +      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
    +      // Wait until Master recover from checkpoint data.
    +      eventually(timeout(5 seconds), interval(100 milliseconds)) {
    +        master.idToApp.size should be(1)
    +      }
    +
    +      master.idToApp.keySet should be(Set(fakeAppInfo.id))
    +      getDrivers(master) should be(Set(fakeDriverInfo))
    +      master.workers should be(Set(fakeWorkerInfo))
    +
    +      // Notify Master about the executor and driver info to make it 
correctly recovered.
    +      val fakeExecutors = List(
    +        new ExecutorDescription(fakeAppInfo.id, 0, 8, 
ExecutorState.RUNNING),
    +        new ExecutorDescription(fakeAppInfo.id, 0, 7, 
ExecutorState.RUNNING))
    +      master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
    +      master.self.send(
    +        WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, 
Seq(fakeDriverInfo.id)))
    +
    +      eventually(timeout(5 seconds), interval(100 microseconds)) {
    +        getState(master) should be(RecoveryState.ALIVE)
    +      }
    +
    +      // If driver's resource is also counted, free cores should 0
    +      fakeWorkerInfo.coresFree should be(0)
    +      fakeWorkerInfo.coresUsed should be(16)
    +      // State of application should be RUNNING
    +      fakeAppInfo.state should be(ApplicationState.RUNNING)
    --- End diff --
    
    shall we also test these before the recovery? To show that we do change 
something when recovering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to