Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6291#discussion_r44179234
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -1275,6 +1278,104 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ def checkJobProperties(taskSet: TaskSet, expected: String): Unit = {
+ assert(taskSet.properties != null)
+ assert(taskSet.properties.getProperty("testProperty") === expected)
+ }
+
+ def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int,
Int, Nothing] = {
+ val baseRdd = new MyRDD(sc, 1, Nil)
+ val shuffleDep1 = new ShuffleDependency(baseRdd, null)
+ val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
+ val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
+ val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
+ val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
+ val job1Properties = new Properties()
+ val job2Properties = new Properties()
+ job1Properties.setProperty("testProperty", "job1")
+ job2Properties.setProperty("testProperty", "job2")
+
+ // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
+ // Note that we have to submit job2 before we cancel job1 to have them
actually share
+ // *Stages*, and not just shuffle dependencies, due to skipped stages
(at least until
+ // we address SPARK-10193.)
+ val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
+ val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
+ assert(scheduler.activeJobs.nonEmpty)
+ val testProperty1 =
scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
+
+ // remove job1 as an ActiveJob
+ cancel(jobId1)
+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+ // job2 should still be running
+ assert(scheduler.activeJobs.nonEmpty)
+ val testProperty2 =
scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
+ assert(testProperty1 != testProperty2)
+ // NB: This next assert isn't necessarily the "desired" behavior; it's
just to document
+ // the current behavior. We've already submitted the TaskSet for
stage 0 based on job1, but
+ // even though we have cancelled that job and are now running it
because of job2, we haven't
+ // updated the TaskSet's properties. Changing the properties to
"job2" is likely the more
+ // correct behavior.
+ checkJobProperties(taskSets(0), "job1")
+ complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+
+ shuffleDep1
+ }
+
+ /**
+ * Makes sure that tasks for a stage used by multiple jobs are submitted
with the properties of a
+ * later, active job if they were previously run under a job that is no
longer active
+ */
+ test("stage used by two jobs, the first no longer active (SPARK-6880)") {
+ launchJobsThatShareStageAndCancelFirst()
+
+ // The next check is the key for SPARK-6880. For the stage which was
shared by both job1 and
+ // job2 but never had any tasks submitted for job1, the properties of
job2 are now used to run
+ // the stage.
+ checkJobProperties(taskSets(1), "job2")
--- End diff --
pretty minor, but I realized that we're only checking the job properties,
but you're also fixing the jobId of this taskSet. could you add that check
here and elsewhere? (I guess including it checking for the "wrong" jobId in
`taskSets(0)` after we cancel job1.)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]