Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/42#discussion_r10641806
  
    --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
    @@ -69,32 +70,47 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
               // If we got here, we have to load the split
               logInfo("Partition %s not found, computing it".format(key))
               val computedValues = rdd.computeOrReadCheckpoint(split, context)
    +
               // Persist the result, so long as the task is not running locally
               if (context.runningLocally) { return computedValues }
    -          if (storageLevel.useDisk && !storageLevel.useMemory) {
    -            // In the case that this RDD is to be persisted using DISK_ONLY
    -            // the iterator will be passed directly to the blockManager 
(rather then
    -            // caching it to an ArrayBuffer first), then the resulting 
block data iterator
    -            // will be passed back to the user. If the iterator generates 
a lot of data,
    -            // this means that it doesn't all have to be held in memory at 
one time.
    -            // This could also apply to MEMORY_ONLY_SER storage, but we 
need to make sure
    -            // blocks aren't dropped by the block store before enabling 
that.
    -            blockManager.put(key, computedValues, storageLevel, tellMaster 
= true)
    -            return blockManager.get(key) match {
    -              case Some(values) =>
    -                return new InterruptibleIterator(context, 
values.asInstanceOf[Iterator[T]])
    -              case None =>
    -                logInfo("Failure to store %s".format(key))
    -                throw new Exception("Block manager failed to return 
persisted valued")
    +
    +          // Keep track of blocks with updated statuses
    +          var updatedBlocks = Seq[(BlockId, BlockStatus)]()
    +          val returnValue: Iterator[T] = {
    +            if (storageLevel.useDisk && !storageLevel.useMemory) {
    +              /* In the case that this RDD is to be persisted using 
DISK_ONLY
    +               * the iterator will be passed directly to the blockManager 
(rather then
    +               * caching it to an ArrayBuffer first), then the resulting 
block data iterator
    +               * will be passed back to the user. If the iterator 
generates a lot of data,
    +               * this means that it doesn't all have to be held in memory 
at one time.
    +               * This could also apply to MEMORY_ONLY_SER storage, but we 
need to make sure
    +               * blocks aren't dropped by the block store before enabling 
that. */
    +              updatedBlocks = blockManager.put(key, computedValues, 
storageLevel, tellMaster = true)
    +              blockManager.get(key) match {
    +                case Some(values) =>
    +                  new InterruptibleIterator(context, 
values.asInstanceOf[Iterator[T]])
    +                case None =>
    +                  logInfo("Failure to store %s".format(key))
    +                  throw new Exception("Block manager failed to return 
persisted valued")
    +              }
    +            } else {
    +              // In this case the RDD is cached to an array buffer. This 
will save the results
    +              // if we're dealing with a 'one-time' iterator
    +              val elements = new ArrayBuffer[Any]
    +              elements ++= computedValues
    +              updatedBlocks = blockManager.put(key, elements, 
storageLevel, tellMaster = true)
    +              elements.iterator.asInstanceOf[Iterator[T]]
                 }
    -          } else {
    -            // In this case the RDD is cached to an array buffer. This 
will save the results
    -            // if we're dealing with a 'one-time' iterator
    -            val elements = new ArrayBuffer[Any]
    -            elements ++= computedValues
    -            blockManager.put(key, elements, storageLevel, tellMaster = 
true)
    -            return elements.iterator.asInstanceOf[Iterator[T]]
               }
    +
    +          // Update task metrics to include any blocks whose storage 
status is updated
    +          val metrics = context.taskMetrics
    --- End diff --
    
    Actually - is there any case where the metrics could be null here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to