Github user witgo commented on a diff in the pull request:
https://github.com/apache/spark/pull/15505#discussion_r95753220
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
val addedFiles: Map[String, Long],
val addedJars: Map[String, Long],
val properties: Properties,
- val serializedTask: ByteBuffer) {
+ private var serializedTask_ : ByteBuffer) extends Logging {
--- End diff --
How about this?
``` scala
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
val addedFiles: Map[String, Long],
val addedJars: Map[String, Long],
val properties: Properties,
private var serializedTask_ : ByteBuffer) extends Logging {
def this(
taskId: Long,
attemptNumber: Int,
executorId: String,
name: String,
index: Int, // Index within this task's TaskSet
addedFiles: Map[String, Long],
addedJars: Map[String, Long],
properties: Properties,
task: Task[_]) {
this(taskId, attemptNumber, executorId, name, index,
addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer])
task_ = task
}
private var task_ : Task[_] = null
private def serializedTask: ByteBuffer = {
if (serializedTask_ == null) {
// This is where we serialize the task on the driver before sending
it to the executor.
// This is not done when creating the TaskDescription so we can
postpone this serialization
// to later in the scheduling process -- particularly,
// so it can happen in another thread by the
CoarseGrainedSchedulerBackend.
// On the executors, this will already be populated by decode
serializedTask_ = try {
ByteBuffer.wrap(Utils.serialize(task_))
} catch {
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to
retry it."
logError(msg, e)
throw new TaskNotSerializableException(e)
}
}
serializedTask_
}
def getTask[_](loader: ClassLoader): Task[_] = {
if (task_ == null) {
task_ = Utils.deserialize(serializedTask,
loader).asInstanceOf[Task[_]]
}
return task_
}
override def toString: String = "TaskDescription(TID=%d,
index=%d)".format(taskId, index)
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]