Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6291#discussion_r44179234
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -1275,6 +1278,104 @@ class DAGSchedulerSuite
         assertDataStructuresEmpty()
       }
     
    +  def checkJobProperties(taskSet: TaskSet, expected: String): Unit = {
    +    assert(taskSet.properties != null)
    +    assert(taskSet.properties.getProperty("testProperty") === expected)
    +  }
    +
    +  def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, 
Int, Nothing] = {
    +    val baseRdd = new MyRDD(sc, 1, Nil)
    +    val shuffleDep1 = new ShuffleDependency(baseRdd, null)
    +    val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
    +    val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
    +    val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
    +    val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
    +    val job1Properties = new Properties()
    +    val job2Properties = new Properties()
    +    job1Properties.setProperty("testProperty", "job1")
    +    job2Properties.setProperty("testProperty", "job2")
    +
    +    // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
    +    // Note that we have to submit job2 before we cancel job1 to have them 
actually share
    +    // *Stages*, and not just shuffle dependencies, due to skipped stages 
(at least until
    +    // we address SPARK-10193.)
    +    val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
    +    val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
    +    assert(scheduler.activeJobs.nonEmpty)
    +    val testProperty1 = 
scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
    +
    +    // remove job1 as an ActiveJob
    +    cancel(jobId1)
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +
    +    // job2 should still be running
    +    assert(scheduler.activeJobs.nonEmpty)
    +    val testProperty2 = 
scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
    +    assert(testProperty1 != testProperty2)
    +    // NB: This next assert isn't necessarily the "desired" behavior; it's 
just to document
    +    // the current behavior.  We've already submitted the TaskSet for 
stage 0 based on job1, but
    +    // even though we have cancelled that job and are now running it 
because of job2, we haven't
    +    // updated the TaskSet's properties.  Changing the properties to 
"job2" is likely the more
    +    // correct behavior.
    +    checkJobProperties(taskSets(0), "job1")
    +    complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
    +
    +    shuffleDep1
    +  }
    +
    +  /**
    +   * Makes sure that tasks for a stage used by multiple jobs are submitted 
with the properties of a
    +   * later, active job if they were previously run under a job that is no 
longer active
    +   */
    +  test("stage used by two jobs, the first no longer active (SPARK-6880)") {
    +    launchJobsThatShareStageAndCancelFirst()
    +
    +    // The next check is the key for SPARK-6880.  For the stage which was 
shared by both job1 and
    +    // job2 but never had any tasks submitted for job1, the properties of 
job2 are now used to run
    +    // the stage.
    +    checkJobProperties(taskSets(1), "job2")
    --- End diff --
    
    pretty minor, but I realized that we're only checking the job properties, 
but you're also fixing the jobId of this taskSet.  could you add that check 
here and elsewhere?  (I guess including it checking for the "wrong" jobId in 
`taskSets(0)` after we cancel job1.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to