Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/16053#discussion_r94466707
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -17,27 +17,123 @@
package org.apache.spark.scheduler
+import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
+import java.util.Properties
-import org.apache.spark.util.SerializableBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, Map}
+
+import org.apache.spark.util.{ByteBufferInputStream,
ByteBufferOutputStream, Utils}
/**
* Description of a task that gets passed onto executors to be executed,
usually created by
- * `TaskSetManager.resourceOffer`.
+ * `TaskSetManager.resourceOffer`.
+ *
+ * TaskDescriptions and the associated Task need to be serialized
carefully for two reasons:
+ *
+ * (1) When a TaskDescription is received by an Executor, the Executor
needs to first get the
+ * list of JARs and files and add these to the classpath, and set
the properties, before
+ * deserializing the Task object (serializedTask). This is why the
Properties are included
+ * in the TaskDescription, even though they're also in the
serialized task.
+ * (2) Because a TaskDescription is serialized and sent to an executor
for each task, efficient
+ * serialization (both in terms of serialization time and
serialized buffer size) is
+ * important. For this reason, we serialize TaskDescriptions
ourselves with the
+ * TaskDescription.encode and TaskDescription.decode methods.
This results in a smaller
+ * serialized size because it avoids serializing unnecessary
fields in the Map objects
+ * (which can introduce significant overhead when the maps are
small).
*/
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
- _serializedTask: ByteBuffer)
- extends Serializable {
+ val addedFiles: Map[String, Long],
+ val addedJars: Map[String, Long],
+ val properties: Properties,
+ val serializedTask: ByteBuffer) {
- // Because ByteBuffers are not serializable, wrap the task in a
SerializableBuffer
- private val buffer = new SerializableBuffer(_serializedTask)
+ override def toString: String = "TaskDescription(TID=%d,
index=%d)".format(taskId, index)
+}
- def serializedTask: ByteBuffer = buffer.value
+private[spark] object TaskDescription {
+ def encode(taskDescription: TaskDescription): ByteBuffer = {
+ val bytesOut = new ByteBufferOutputStream(4096)
+ val dataOut = new DataOutputStream(bytesOut)
- override def toString: String = "TaskDescription(TID=%d,
index=%d)".format(taskId, index)
+ dataOut.writeLong(taskDescription.taskId)
+ dataOut.writeInt(taskDescription.attemptNumber)
+ dataOut.writeUTF(taskDescription.executorId)
+ dataOut.writeUTF(taskDescription.name)
+ dataOut.writeInt(taskDescription.index)
+
+ // Write files.
+ dataOut.writeInt(taskDescription.addedFiles.size)
+ for ((name, timestamp) <- taskDescription.addedFiles) {
+ dataOut.writeUTF(name)
+ dataOut.writeLong(timestamp)
+ }
+
+ // Write jars.
+ dataOut.writeInt(taskDescription.addedJars.size)
+ for ((name, timestamp) <- taskDescription.addedJars) {
+ dataOut.writeUTF(name)
+ dataOut.writeLong(timestamp)
+ }
--- End diff --
Done
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]