Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15505#discussion_r93079839
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -17,27 +17,179 @@
package org.apache.spark.scheduler
+import java.io._
import java.nio.ByteBuffer
+import java.util.Properties
-import org.apache.spark.util.SerializableBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.util.{ByteBufferInputStream,
ByteBufferOutputStream}
/**
* Description of a task that gets passed onto executors to be executed,
usually created by
* [[TaskSetManager.resourceOffer]].
*/
-private[spark] class TaskDescription(
+private[spark] class TaskDescription private(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
- val index: Int, // Index within this task's TaskSet
- _serializedTask: ByteBuffer)
- extends Serializable {
+ val index: Int,
+ val taskFiles: mutable.Map[String, Long],
+ val taskJars: mutable.Map[String, Long],
+ private var task_ : Task[_],
+ private var taskBytes: InputStream,
+ private var taskProps: Properties) {
+
+ def this(
+ taskId: Long,
+ attemptNumber: Int,
+ executorId: String,
+ name: String,
+ index: Int,
+ taskFiles: mutable.Map[String, Long],
+ taskJars: mutable.Map[String, Long],
+ task: Task[_]) {
+ this(taskId, attemptNumber, executorId, name, index, taskFiles,
taskJars, task,
+ null.asInstanceOf[InputStream],
+ null.asInstanceOf[Properties])
+ }
+
+ @throws[IOException]
+ def encode(serializer: SerializerInstance): ByteBuffer = {
+ val out = new ByteBufferOutputStream(4096)
+ encode(out, serializer)
+ out.close()
+ out.toByteBuffer
+ }
+
+ @throws[IOException]
+ def encode(outputStream: OutputStream, serializer: SerializerInstance):
Unit = {
+ val out = new DataOutputStream(outputStream)
+ // Write taskId
+ out.writeLong(taskId)
+
+ // Write attemptNumber
+ out.writeInt(attemptNumber)
+
+ // Write executorId
+ out.writeUTF(executorId)
+
+ // Write name
+ out.writeUTF(name)
- // Because ByteBuffers are not serializable, wrap the task in a
SerializableBuffer
- private val buffer = new SerializableBuffer(_serializedTask)
+ // Write index
+ out.writeInt(index)
- def serializedTask: ByteBuffer = buffer.value
+ // Write taskFiles
+ out.writeInt(taskFiles.size)
+ for ((name, timestamp) <- taskFiles) {
+ out.writeUTF(name)
+ out.writeLong(timestamp)
+ }
+
+ // Write taskJars
+ out.writeInt(taskJars.size)
+ for ((name, timestamp) <- taskJars) {
+ out.writeUTF(name)
+ out.writeLong(timestamp)
+ }
+
+ // Write the task properties separately so it is available before full
task deserialization.
+ val taskProps = task_.localProperties
+ val propertyNames = taskProps.stringPropertyNames
+ out.writeInt(propertyNames.size())
+ propertyNames.asScala.foreach { key =>
+ val value = taskProps.getProperty(key)
+ out.writeUTF(key)
+ out.writeUTF(value)
+ }
+
+ // Write the task itself and finish
+ val serializeStream = serializer.serializeStream(out)
+ serializeStream.writeValue(task_)
+ serializeStream.flush()
+ }
+
+ def task(ser: SerializerInstance): Task[_] = {
+ if (task_ == null) {
+ val deserializeStream = ser.deserializeStream(taskBytes)
+ task_ = deserializeStream.readValue[Task[_]]()
+ task_.localProperties = taskProps
+ deserializeStream.close()
+ taskProps = null
+ taskBytes = null
+ }
+ task_
--- End diff --
it looks like `task()` is only called once, so the point of this is just
allow aggressive nulling of `taskProps` and `taskBytes` for gc purposes, right?
Nulling `taskProps` doesn't seem to have any value -- it should always
still be present on `task_.localProperties`.
I suppose `taskBytes` can be somewhat large, so its worth this complexity?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]