Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19285#discussion_r163749987
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -702,6 +641,87 @@ private[spark] class MemoryStore(
       }
     }
     
    +private trait ValuesBuilder[T] {
    +  def preciseSize: Long
    +  def build(): MemoryEntry[T]
    +}
    +
    +private trait ValuesHolder[T] {
    +  def storeValue(value: T): Unit
    +  def estimatedSize(): Long
    +  def getBuilder(): ValuesBuilder[T]
    +}
    +
    +/**
    + * A holder for storing the deserialized values.
    + */
    +private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends 
ValuesHolder[T] {
    +  // Underlying vector for unrolling the block
    +  var vector = new SizeTrackingVector[T]()(classTag)
    +  var arrayValues: Array[T] = null
    +
    +  override def storeValue(value: T): Unit = {
    +    vector += value
    +  }
    +
    +  override def estimatedSize(): Long = {
    +    vector.estimateSize()
    +  }
    +
    +  override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] {
    +    // We successfully unrolled the entirety of this block
    +    arrayValues = vector.toArray
    +    vector = null
    +
    +    override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
    +
    +    override def build(): MemoryEntry[T] =
    +      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
    +  }
    +}
    +
    +/**
    + * A holder for storing the serialized values.
    + */
    +private class SerializedValuesHolder[T](
    +    blockId: BlockId,
    +    chunkSize: Int,
    +    classTag: ClassTag[T],
    +    memoryMode: MemoryMode,
    +    serializerManager: SerializerManager) extends ValuesHolder[T] {
    +  val allocator = memoryMode match {
    +    case MemoryMode.ON_HEAP => ByteBuffer.allocate _
    +    case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
    +  }
    +
    +  val redirectableStream = new RedirectableOutputStream
    +  val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
    +  redirectableStream.setOutputStream(bbos)
    +  val serializationStream: SerializationStream = {
    +    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    +    val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
    +    ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
    +  }
    +
    +  override def storeValue(value: T): Unit = {
    +    serializationStream.writeObject(value)(classTag)
    +  }
    +
    +  override def estimatedSize(): Long = {
    +    bbos.size
    +  }
    +
    +  override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] {
    +    // We successfully unrolled the entirety of this block
    +    serializationStream.close()
    +
    +    override val preciseSize: Long = bbos.size
    --- End diff --
    
    this can be a `def`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to