Github user ericl commented on a diff in the pull request:
https://github.com/apache/spark/pull/15085#discussion_r78654285
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
---
@@ -869,7 +876,19 @@ private[spark] class BlockManager(
blockInfoManager.unlock(blockId)
}
} else {
- blockInfoManager.removeBlock(blockId)
+ // If an exception was thrown then it's possible that the code in
`putBody` has already
+ // notified the master about the availability of this block, so we
need to send an update
+ // to remove this block location.
+ removeBlockInternal(
+ blockId, tellMaster = tellMaster && putBlockInfo.tellMaster &&
exceptionWasThrown)
+ if (exceptionWasThrown) {
+ // The `putBody` code may have also added a new block status to
TaskMetrics, so we need
+ // to cancel that out by overwriting it with an empty block
status. We only do this if
+ // the finally block was entered via an exception because doing
this unconditionally would
+ // cause us to send empty block statuses for every block that
failed to be cached due to
+ // a memory shortage (which is an expected failure, unlike an
uncaught exception).
+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+ }
--- End diff --
nit: might be nice to put all the code relevant to exceptionWasThrown in
this if
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]