Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15505#discussion_r96083494
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- val serializedTask = TaskDescription.encode(task)
- if (serializedTask.limit >= maxRpcMessageSize) {
- scheduler.taskIdToTaskSetManager.get(task.taskId).foreach {
taskSetMgr =>
- try {
- var msg = "Serialized task %s:%d was %d bytes, which exceeds
max allowed: " +
- "spark.rpc.message.maxSize (%d bytes). Consider increasing
" +
- "spark.rpc.message.maxSize or using broadcast variables
for large values."
- msg = msg.format(task.taskId, task.index,
serializedTask.limit, maxRpcMessageSize)
- taskSetMgr.abort(msg)
- } catch {
- case e: Exception => logError("Exception in error callback",
e)
- }
- }
+ val serializedTask = try {
+ TaskDescription.encode(task)
+ } catch {
+ case NonFatal(e) =>
+ abortTaskSetManager(scheduler, task.taskId,
--- End diff --
hmm, what happens here is *one* of the tasks can't be serialized (or is too
big etc.). We'll abort the taskset, but wont' we still send out the
`LaunchTask` events for all the other tasks? That wouldn't happen before.
This may actually be a big problem -- we might lose the performance
benefits if you first have to create all the serialized tasks to make sure they
all work, and then send all the msgs.
Maybe its ok to still have previous tasks start, but seems like we should
at least prevent any *more* tasks from starting, or continuing to try to
serialize every other task. I just modified the test case in
`CoarseGrainedSchedulerBackendSuite` so it waits for multiple executors to come
up before submitting the bad job, and the logs do show 2 instances of
```
17/01/13 16:12:03.213 dispatcher-event-loop-3 ERROR TaskDescription: Failed
to serialize task 2, not attempting to retry it.
java.io.NotSerializableException
at
org.apache.spark.scheduler.NotSerializablePartitionRDD$$anonfun$getPartitions$1$$anon$1.writeObject(CoarseGrainedSchedulerBackendSuite.scala:38)
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]