Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19285#discussion_r163749987
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +641,87 @@ private[spark] class MemoryStore(
}
}
+private trait ValuesBuilder[T] {
+ def preciseSize: Long
+ def build(): MemoryEntry[T]
+}
+
+private trait ValuesHolder[T] {
+ def storeValue(value: T): Unit
+ def estimatedSize(): Long
+ def getBuilder(): ValuesBuilder[T]
+}
+
+/**
+ * A holder for storing the deserialized values.
+ */
+private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends
ValuesHolder[T] {
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[T]()(classTag)
+ var arrayValues: Array[T] = null
+
+ override def storeValue(value: T): Unit = {
+ vector += value
+ }
+
+ override def estimatedSize(): Long = {
+ vector.estimateSize()
+ }
+
+ override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] {
+ // We successfully unrolled the entirety of this block
+ arrayValues = vector.toArray
+ vector = null
+
+ override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
+
+ override def build(): MemoryEntry[T] =
+ DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+ }
+}
+
+/**
+ * A holder for storing the serialized values.
+ */
+private class SerializedValuesHolder[T](
+ blockId: BlockId,
+ chunkSize: Int,
+ classTag: ClassTag[T],
+ memoryMode: MemoryMode,
+ serializerManager: SerializerManager) extends ValuesHolder[T] {
+ val allocator = memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+
+ val redirectableStream = new RedirectableOutputStream
+ val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
+ redirectableStream.setOutputStream(bbos)
+ val serializationStream: SerializationStream = {
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val ser = serializerManager.getSerializer(classTag,
autoPick).newInstance()
+ ser.serializeStream(serializerManager.wrapForCompression(blockId,
redirectableStream))
+ }
+
+ override def storeValue(value: T): Unit = {
+ serializationStream.writeObject(value)(classTag)
+ }
+
+ override def estimatedSize(): Long = {
+ bbos.size
+ }
+
+ override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] {
+ // We successfully unrolled the entirety of this block
+ serializationStream.close()
+
+ override val preciseSize: Long = bbos.size
--- End diff --
this can be a `def`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]