Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19285#discussion_r163743072
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -346,85 +350,24 @@ private[spark] class MemoryStore(
         } else {
           initialMemoryThreshold.toInt
         }
    -    val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
    -    redirectableStream.setOutputStream(bbos)
    -    val serializationStream: SerializationStream = {
    -      val autoPick = !blockId.isInstanceOf[StreamBlockId]
    -      val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
    -      ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
    -    }
    -
    -    // Request enough memory to begin unrolling
    -    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
    -
    -    if (!keepUnrolling) {
    -      logWarning(s"Failed to reserve initial memory threshold of " +
    -        s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
    -    } else {
    -      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    -    }
    -
    -    def reserveAdditionalMemoryIfNecessary(): Unit = {
    -      if (bbos.size > unrollMemoryUsedByThisBlock) {
    -        val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
    -        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
    -        if (keepUnrolling) {
    -          unrollMemoryUsedByThisBlock += amountToRequest
    -        }
    -      }
    -    }
    -
    -    // Unroll this block safely, checking whether we have exceeded our 
threshold
    -    while (values.hasNext && keepUnrolling) {
    -      serializationStream.writeObject(values.next())(classTag)
    -      elementsUnrolled += 1
    -      if (elementsUnrolled % memoryCheckPeriod == 0) {
    -        reserveAdditionalMemoryIfNecessary()
    -      }
    -    }
     
    -    // Make sure that we have enough memory to store the block. By this 
point, it is possible that
    -    // the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
    -    // perform one final call to attempt to allocate additional memory if 
necessary.
    -    if (keepUnrolling) {
    -      serializationStream.close()
    -      if (bbos.size > unrollMemoryUsedByThisBlock) {
    -        val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
    -        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
    -        if (keepUnrolling) {
    -          unrollMemoryUsedByThisBlock += amountToRequest
    -        }
    -      }
    -    }
    +    val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
    +      memoryMode, serializerManager)
     
    -    if (keepUnrolling) {
    --- End diff --
    
    Thanks for the detailed explanation. I have been updated, the code looks 
more clearly now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to