Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/42#discussion_r10642848
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -69,32 +70,47 @@ private[spark] class CacheManager(blockManager:
BlockManager) extends Logging {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
+
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
- if (storageLevel.useDisk && !storageLevel.useMemory) {
- // In the case that this RDD is to be persisted using DISK_ONLY
- // the iterator will be passed directly to the blockManager
(rather then
- // caching it to an ArrayBuffer first), then the resulting
block data iterator
- // will be passed back to the user. If the iterator generates
a lot of data,
- // this means that it doesn't all have to be held in memory at
one time.
- // This could also apply to MEMORY_ONLY_SER storage, but we
need to make sure
- // blocks aren't dropped by the block store before enabling
that.
- blockManager.put(key, computedValues, storageLevel, tellMaster
= true)
- return blockManager.get(key) match {
- case Some(values) =>
- return new InterruptibleIterator(context,
values.asInstanceOf[Iterator[T]])
- case None =>
- logInfo("Failure to store %s".format(key))
- throw new Exception("Block manager failed to return
persisted valued")
+
+ // Keep track of blocks with updated statuses
+ var updatedBlocks = Seq[(BlockId, BlockStatus)]()
+ val returnValue: Iterator[T] = {
+ if (storageLevel.useDisk && !storageLevel.useMemory) {
+ /* In the case that this RDD is to be persisted using
DISK_ONLY
+ * the iterator will be passed directly to the blockManager
(rather then
+ * caching it to an ArrayBuffer first), then the resulting
block data iterator
+ * will be passed back to the user. If the iterator
generates a lot of data,
+ * this means that it doesn't all have to be held in memory
at one time.
+ * This could also apply to MEMORY_ONLY_SER storage, but we
need to make sure
+ * blocks aren't dropped by the block store before enabling
that. */
+ updatedBlocks = blockManager.put(key, computedValues,
storageLevel, tellMaster = true)
+ blockManager.get(key) match {
+ case Some(values) =>
+ new InterruptibleIterator(context,
values.asInstanceOf[Iterator[T]])
+ case None =>
+ logInfo("Failure to store %s".format(key))
+ throw new Exception("Block manager failed to return
persisted valued")
+ }
+ } else {
+ // In this case the RDD is cached to an array buffer. This
will save the results
+ // if we're dealing with a 'one-time' iterator
+ val elements = new ArrayBuffer[Any]
+ elements ++= computedValues
+ updatedBlocks = blockManager.put(key, elements,
storageLevel, tellMaster = true)
+ elements.iterator.asInstanceOf[Iterator[T]]
}
- } else {
- // In this case the RDD is cached to an array buffer. This
will save the results
- // if we're dealing with a 'one-time' iterator
- val elements = new ArrayBuffer[Any]
- elements ++= computedValues
- blockManager.put(key, elements, storageLevel, tellMaster =
true)
- return elements.iterator.asInstanceOf[Iterator[T]]
}
+
+ // Update task metrics to include any blocks whose storage
status is updated
+ val metrics = context.taskMetrics
--- End diff --
Ya ideally that would be better - otherwise we risk masking what is an
actual failure during a legitimate execution. If you can just clean up the
tests that would be great. If they are creating `TaskContext` objects we should
include an empty metrics inside of them
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---