Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/19285#discussion_r149379817
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +259,97 @@ private[spark] class MemoryStore(
// If this task attempt already owns more unroll memory than is
necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP,
excessUnrollMemory)
+ releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
+
if (enoughStorageMemory) {
entries.synchronized {
- entries.put(blockId, entry)
+ entries.put(blockId, createMemoryEntry())
}
logInfo("Block %s stored as values in memory (estimated size %s,
free %s)".format(
blockId, Utils.bytesToString(size),
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >=
unrollMemoryUsedByThisBlock,
"released too much unroll memory")
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ } else {
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ }
+
+ /**
+ * Attempt to put the given block in memory store as values.
+ *
+ * It's possible that the iterator is too large to materialize and store
in memory. To avoid
+ * OOM exceptions, this method will gradually unroll the iterator while
periodically checking
+ * whether there is enough free memory. If the block is successfully
materialized, then the
+ * temporary unroll memory used during the materialization is
"transferred" to storage memory,
+ * so we won't acquire more memory than is actually needed to store the
block.
+ *
+ * @return in case of success, the estimated size of the stored data. In
case of failure, return
+ * an iterator containing the values of the block. The returned
iterator will be backed
+ * by the combination of the partially-unrolled block and the
remaining elements of the
+ * original input iterator. The caller must either fully consume
this iterator or call
+ * `close()` on it in order to free the storage memory consumed
by the partially-unrolled
+ * block.
+ */
+ private[storage] def putIteratorAsValues[T](
+ blockId: BlockId,
+ values: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] =
{
+
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[T]()(classTag)
+ var arrayValues: Array[T] = null
+ var preciseSize: Long = -1
+
+ def storeValue(value: T): Unit = {
+ vector += value
+ }
+
+ def estimateSize(precise: Boolean): Long = {
+ if (precise) {
+ // We only call need the precise size after all values unrolled.
+ arrayValues = vector.toArray
+ preciseSize = SizeEstimator.estimate(arrayValues)
+ vector = null
+ preciseSize
+ } else {
+ vector.estimateSize()
+ }
+ }
+
+ def createMemoryEntry(): MemoryEntry[T] = {
+ // We successfully unrolled the entirety of this block
+ assert(arrayValues != null, "arrayValue shouldn't be null!")
+ assert(preciseSize != -1, "preciseSize shouldn't be -1")
+ val entry = new DeserializedMemoryEntry[T](arrayValues, preciseSize,
classTag)
--- End diff --
Why do we need to create the val `entry`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]