Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3638#discussion_r22201978
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -331,4 +331,34 @@ class TaskSchedulerImplSuite extends FunSuite with 
LocalSparkContext with Loggin
         assert(1 === taskDescriptions.length)
         assert("executor0" === taskDescriptions(0).executorId)
       }
    +
    +  test("Scheduler does not crash when tasks are not serializable") {
    +    sc = new SparkContext("local", "TaskSchedulerImplSuite")
    +    val taskCpus = 2
    +
    +    sc.conf.set("spark.task.cpus", taskCpus.toString)
    +    val taskScheduler = new TaskSchedulerImpl(sc)
    +    taskScheduler.initialize(new FakeSchedulerBackend)
    +    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
    +    val dagScheduler = new DAGScheduler(sc, taskScheduler) {
    +      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
    +      override def executorAdded(execId: String, host: String) {}
    +    }
    +    val numFreeCores = 1
    +    taskScheduler.setDAGScheduler(dagScheduler)
    +    var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new 
NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
    +    val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 
taskCpus),
    +      new WorkerOffer("executor1", "host1", numFreeCores))
    +    taskScheduler.submitTasks(taskSet)
    +    var taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
    +    assert(0 === taskDescriptions.length)
    +
    +    // Now check that we can still submit tasks
    +    taskSet = FakeTask.createTaskSet(1)
    +    taskScheduler.submitTasks(taskSet)
    +    taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
    +    assert(1 === taskDescriptions.length)
    --- End diff --
    
    Minor style point, but I think you can combine this assertion with the 
following one, which will give more informative error messages if the assertion 
fails (since now you'll see the difference in contents if the lengths are not 
equal):
    
    ```scala
    assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to