Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/15036#discussion_r78266633
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
---
@@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
- // Removals are idempotent in disk store and memory store. At
worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning(s"Block $blockId could not be removed as it was not
found in either " +
- "the disk, memory, or external block store")
- }
- blockInfoManager.removeBlock(blockId)
- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info, removeBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId ->
removeBlockStatus)
- }
+ removeBlockInternal(blockId, tellMaster = tellMaster &&
info.tellMaster)
+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+ }
+ }
+
+ /**
+ * Internal version of [[removeBlock()]] which assumes that the caller
already holds a write
+ * lock on the block.
+ */
+ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean):
Unit = {
+ // Removals are idempotent in disk store and memory store. At worst,
we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning(s"Block $blockId could not be removed as it was not found
on disk or in memory")
+ }
+ blockInfoManager.removeBlock(blockId)
+ if (tellMaster) {
+ reportBlockStatus(blockId, BlockStatus.empty)
+ }
+ }
+
+ private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status:
BlockStatus): Unit = {
+ Option(TaskContext.get()).foreach { c =>
--- End diff --
One new change: I decided to move this duplicated logic into this helper
function. See c3cc277af163c86cf6c238f475b8374118753198. This is going to make
it easier to skip the block status update in certain places in my next patch.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]