Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6291#discussion_r41210828
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -1256,6 +1259,136 @@ class DAGSchedulerSuite
         assertDataStructuresEmpty()
       }
     
    +  def checkJobProperties(taskSet: TaskSet, expected: String): Unit = {
    +    assert(taskSet.properties != null)
    +    assert(taskSet.properties.getProperty("testProperty") === expected)
    +  }
    +
    +  /**
    +   * Makes sure that tasks for a stage used by multiple jobs are submitted 
with the properties of a
    +   * later, active job if they were previously run under a job that is no 
longer active
    +   */
    +  test("stage used by two jobs, the first no longer active (SPARK-6880)") {
    +    val baseRdd = new MyRDD(sc, 1, Nil)
    +    val shuffleDep1 = new ShuffleDependency(baseRdd, null)
    +    val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
    +    val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
    +    val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
    +    val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
    +    val job1Properties = new Properties()
    +    val job2Properties = new Properties()
    +    job1Properties.setProperty("testProperty", "job1")
    +    job2Properties.setProperty("testProperty", "job2")
    +
    +    // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
    +    // Note that we have to submit job2 before we cancel job1 to have them 
actually share
    +    // *Stages*, and not just shuffle dependencies, due to skipped stages 
(at least until
    +    // we address SPARK-10193.)
    +    val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
    +    val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
    +    assert(scheduler.activeJobs.nonEmpty)
    +    val testProperty1 = 
scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
    +
    +    // remove job1 as an ActiveJob
    +    cancel(jobId1)
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +
    +    // job2 should still be running
    +    assert(scheduler.activeJobs.nonEmpty)
    +    val testProperty2 = 
scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
    +    assert(testProperty1 != testProperty2)
    +    assert(taskSets(0).properties != null)
    +    // NB: This next assert isn't necessarily the "desired" behavior; it's 
just to document
    +    // the current behavior.  We've already submitted the TaskSet for 
stage 0 based on job1, but
    +    // even though we have cancelled that job and are now running it 
because of job2, we haven't
    +    // updated the TaskSet's properties.  Changing the properties to 
"job2" is likely the more
    +    // correct behavior.
    +    checkJobProperties(taskSets(0), "job1")
    --- End diff --
    
    Looks like line 1272 to here is the same as in the next test function; can 
you move this to a shard function "launchJobsThatShareStageAndCancelFirst" or 
something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to