Github user ConeyLiu commented on a diff in the pull request:
https://github.com/apache/spark/pull/19285#discussion_r163551992
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
} else {
initialMemoryThreshold.toInt
}
- val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
- redirectableStream.setOutputStream(bbos)
- val serializationStream: SerializationStream = {
- val autoPick = !blockId.isInstanceOf[StreamBlockId]
- val ser = serializerManager.getSerializer(classTag,
autoPick).newInstance()
- ser.serializeStream(serializerManager.wrapForCompression(blockId,
redirectableStream))
- }
-
- // Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId,
initialMemoryThreshold, memoryMode)
-
- if (!keepUnrolling) {
- logWarning(s"Failed to reserve initial memory threshold of " +
- s"${Utils.bytesToString(initialMemoryThreshold)} for computing
block $blockId in memory.")
- } else {
- unrollMemoryUsedByThisBlock += initialMemoryThreshold
- }
-
- def reserveAdditionalMemoryIfNecessary(): Unit = {
- if (bbos.size > unrollMemoryUsedByThisBlock) {
- val amountToRequest = (bbos.size * memoryGrowthFactor -
unrollMemoryUsedByThisBlock).toLong
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId,
amountToRequest, memoryMode)
- if (keepUnrolling) {
- unrollMemoryUsedByThisBlock += amountToRequest
- }
- }
- }
-
- // Unroll this block safely, checking whether we have exceeded our
threshold
- while (values.hasNext && keepUnrolling) {
- serializationStream.writeObject(values.next())(classTag)
- elementsUnrolled += 1
- if (elementsUnrolled % memoryCheckPeriod == 0) {
- reserveAdditionalMemoryIfNecessary()
- }
- }
- // Make sure that we have enough memory to store the block. By this
point, it is possible that
- // the block's actual memory usage has exceeded the unroll memory by a
small amount, so we
- // perform one final call to attempt to allocate additional memory if
necessary.
- if (keepUnrolling) {
- serializationStream.close()
- if (bbos.size > unrollMemoryUsedByThisBlock) {
- val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId,
amountToRequest, memoryMode)
- if (keepUnrolling) {
- unrollMemoryUsedByThisBlock += amountToRequest
- }
- }
- }
+ val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize,
classTag,
+ memoryMode, serializerManager)
- if (keepUnrolling) {
--- End diff --
I do not understand what you mean, could you explain it more?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]