Github user ksakellis commented on a diff in the pull request:
https://github.com/apache/spark/pull/3120#discussion_r20247484
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -44,7 +44,14 @@ private[spark] class CacheManager(blockManager:
BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
+ val existingMetrics = context.taskMetrics.inputMetrics
+ val prevBytesRead = existingMetrics
+ .filter( _.readMethod == blockResult.inputMetrics.readMethod)
+ .map(_.bytesRead)
+ .getOrElse(0L)
+
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ context.taskMetrics.inputMetrics.get.bytesRead += prevBytesRead
--- End diff --
Since this code does not change any state in CacheManager itself, it should
not affect the thread safety of the outer object. So what is important is, will
multiple threads call getOrCompute and pass in the same TaskContext (two
threads operating on the same task). I don't think that happens since only a
single thread operates on each task. Please let me know if I'm missing
something.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]