Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/16620#discussion_r99615666
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,96 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with Timeou
}
}
+ test("[SPARK-19263] DAGScheduler should avoid sending conflicting task
set") {
+ val mockTaskSchedulerImpl = new TaskSchedulerImpl(sc) {
+ override def submitTasks(taskSet: TaskSet): Unit = {
+ super.submitTasks(taskSet)
+ taskSets += taskSet
+ }
+ }
+ val mockDAGScheduler = new DAGScheduler(
+ sc,
+ mockTaskSchedulerImpl,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env
+ ) {
+ override def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ taskInfo: TaskInfo): Unit = {
+ dagEventProcessLoopTester.post(
+ CompletionEvent(task, reason, result, accumUpdates, taskInfo))
+ }
+ }
+
+ val mockSchedulerBackend = new SchedulerBackend {
+ override def stop(): Unit = {}
+
+ override def defaultParallelism(): Int = 2
+
+ override def reviveOffers(): Unit = {}
+
+ override def start(): Unit = {}
+ }
+
+ def getTaskSetManagerByTask(task: Task[_]): TaskSetManager = {
+ val taskSetManagerOpt = mockTaskSchedulerImpl
+ .taskSetManagerForAttempt(task.stageId, task.stageAttemptId)
+ assert(taskSetManagerOpt.isDefined)
+ taskSetManagerOpt.get
+ }
+
+ def resourceOffer(taskSetManager: TaskSetManager, host: String,
execId: String): Unit = {
+ taskSetManager.resourceOffer(execId, host, TaskLocality.ANY)
+ }
+
+ def taskSuccessful(tsm: TaskSetManager, task: Task[_], result: Any):
Unit = {
+ val taskIdOpt = tsm.taskInfos.find(_._2.index == task.partitionId)
--- End diff --
this would be a little cleaner if it didn't need the `tsm` passed in,
instead inside it did `val tsm = getTaskSetManagerByTask(task)`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]