Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16053#discussion_r94463665
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
    @@ -17,27 +17,123 @@
     
     package org.apache.spark.scheduler
     
    +import java.io.{DataInputStream, DataOutputStream}
     import java.nio.ByteBuffer
    +import java.util.Properties
     
    -import org.apache.spark.util.SerializableBuffer
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{HashMap, Map}
    +
    +import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream, Utils}
     
     /**
      * Description of a task that gets passed onto executors to be executed, 
usually created by
    - * `TaskSetManager.resourceOffer`.
    +  * `TaskSetManager.resourceOffer`.
    + *
    + * TaskDescriptions and the associated Task need to be serialized 
carefully for two reasons:
    + *
    + *     (1) When a TaskDescription is received by an Executor, the Executor 
needs to first get the
    + *         list of JARs and files and add these to the classpath, and set 
the properties, before
    + *         deserializing the Task object (serializedTask). This is why the 
Properties are included
    + *         in the TaskDescription, even though they're also in the 
serialized task.
    + *     (2) Because a TaskDescription is serialized and sent to an executor 
for each task, efficient
    + *         serialization (both in terms of serialization time and 
serialized buffer size) is
    + *         important. For this reason, we serialize TaskDescriptions 
ourselves with the
    + *         TaskDescription.encode and TaskDescription.decode methods.  
This results in a smaller
    + *         serialized size because it avoids serializing unnecessary 
fields in the Map objects
    + *         (which can introduce significant overhead when the maps are 
small).
      */
     private[spark] class TaskDescription(
         val taskId: Long,
         val attemptNumber: Int,
         val executorId: String,
         val name: String,
         val index: Int,    // Index within this task's TaskSet
    -    _serializedTask: ByteBuffer)
    -  extends Serializable {
    +    val addedFiles: Map[String, Long],
    +    val addedJars: Map[String, Long],
    +    val properties: Properties,
    +    val serializedTask: ByteBuffer) {
     
    -  // Because ByteBuffers are not serializable, wrap the task in a 
SerializableBuffer
    -  private val buffer = new SerializableBuffer(_serializedTask)
    +  override def toString: String = "TaskDescription(TID=%d, 
index=%d)".format(taskId, index)
    +}
     
    -  def serializedTask: ByteBuffer = buffer.value
    +private[spark] object TaskDescription {
    +  def encode(taskDescription: TaskDescription): ByteBuffer = {
    +    val bytesOut = new ByteBufferOutputStream(4096)
    +    val dataOut = new DataOutputStream(bytesOut)
     
    -  override def toString: String = "TaskDescription(TID=%d, 
index=%d)".format(taskId, index)
    +    dataOut.writeLong(taskDescription.taskId)
    +    dataOut.writeInt(taskDescription.attemptNumber)
    +    dataOut.writeUTF(taskDescription.executorId)
    +    dataOut.writeUTF(taskDescription.name)
    +    dataOut.writeInt(taskDescription.index)
    +
    +    // Write files.
    +    dataOut.writeInt(taskDescription.addedFiles.size)
    +    for ((name, timestamp) <- taskDescription.addedFiles) {
    +      dataOut.writeUTF(name)
    +      dataOut.writeLong(timestamp)
    +    }
    +
    +    // Write jars.
    +    dataOut.writeInt(taskDescription.addedJars.size)
    +    for ((name, timestamp) <- taskDescription.addedJars) {
    +      dataOut.writeUTF(name)
    +      dataOut.writeLong(timestamp)
    +    }
    +
    +    // Write properties.
    +    dataOut.writeInt(taskDescription.properties.size())
    +    taskDescription.properties.stringPropertyNames.asScala.foreach { name 
=>
    +      dataOut.writeUTF(name)
    +      dataOut.writeUTF(taskDescription.properties.getProperty(name))
    +    }
    +
    +    // Write the task. The task is already serialized, so write it 
directly to the byte buffer
    +    // (this requires first flushing the data output stream, so that all 
of the data has been
    +    // written from the data output stream so the underlying 
ByteBufferOutputStream before
    +    // we write the task).
    +    dataOut.flush()
    --- End diff --
    
    Cool thanks for noticing this -- fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to