Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2470#discussion_r18136384
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
    @@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
     /**
      * Result returned by a ShuffleMapTask to a scheduler. Includes the block 
manager address that the
      * task ran on as well as the sizes of outputs for each reducer, for 
passing on to the reduce tasks.
    - * The map output sizes are compressed using MapOutputTracker.compressSize.
      */
    -private[spark] class MapStatus(var location: BlockManagerId, var 
compressedSizes: Array[Byte])
    -  extends Externalizable {
    +private[spark] sealed trait MapStatus {
    +  /** Location where this task was run. */
    +  def location: BlockManagerId
     
    -  def this() = this(null, null)  // For deserialization only
    +  /** Estimated size for the reduce block. */
    +  def getSizeForBlock(reduceId: Int): Long
    +}
    +
    +
    +private[spark] object MapStatus {
    +
    +  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): 
MapStatus = {
    +    if (uncompressedSizes.length > 2000) {
    +      new LargeMapStatus(loc, uncompressedSizes)
    +    } else {
    +      new DetailedMapStatus(loc, uncompressedSizes)
    +    }
    +  }
    +
    +  private[this] val LOG_BASE = 1.1
    +
    +  /**
    +   * Compress a size in bytes to 8 bits for efficient reporting of map 
output sizes.
    +   * We do this by encoding the log base 1.1 of the size as an integer, 
which can support
    +   * sizes up to 35 GB with at most 10% error.
    +   */
    +  def compressSize(size: Long): Byte = {
    +    if (size == 0) {
    +      0
    +    } else if (size <= 1L) {
    +      1
    +    } else {
    +      math.min(255, math.ceil(math.log(size) / 
math.log(LOG_BASE)).toInt).toByte
    +    }
    +  }
    +
    +  /**
    +   * Decompress an 8-bit encoded block size, using the reverse operation 
of compressSize.
    +   */
    +  def decompressSize(compressedSize: Byte): Long = {
    +    if (compressedSize == 0) {
    +      0
    +    } else {
    +      math.pow(LOG_BASE, compressedSize & 0xFF).toLong
    +    }
    +  }
    +}
    +
    +
    +/**
    + * A [[MapStatus]] implementation that tracks the size of each block. 
Block sizes are compressed
    + * into a single byte.
    + *
    + * @param loc location where the task is being executed.
    + * @param compressedSizes size of the blocks, indexed by reduce partition 
id.
    + */
    +private[spark] class DetailedMapStatus(
    +    private[this] var loc: BlockManagerId,
    +    private[this] var compressedSizes: Array[Byte])
    +  extends MapStatus with Externalizable {
    +
    +  protected def this() = this(null, null.asInstanceOf[Array[Byte]])  // 
For deserialization only
    +
    +  def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
    +    this(loc, uncompressedSizes.map(MapStatus.compressSize))
    +  }
     
    -  def writeExternal(out: ObjectOutput) {
    -    location.writeExternal(out)
    +  override def location: BlockManagerId = loc
    +
    +  override def getSizeForBlock(reduceId: Int): Long = {
    +    MapStatus.decompressSize(compressedSizes(reduceId))
    +  }
    +
    +  override def writeExternal(out: ObjectOutput) {
    +    loc.writeExternal(out)
         out.writeInt(compressedSizes.length)
         out.write(compressedSizes)
       }
     
    -  def readExternal(in: ObjectInput) {
    -    location = BlockManagerId(in)
    -    compressedSizes = new Array[Byte](in.readInt())
    +  override def readExternal(in: ObjectInput) {
    --- End diff --
    
    Do we have a rule for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to