ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028179


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint(
     case StopBlockManagerMaster =>
       context.reply(true)
       stop()
+
+    case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+      // This is to report the information that a rdd block(with `blockId`) is 
computed
+      // and cached by task(with `taskId`). And this happens right after the 
task finished
+      // computing/caching the block only when the block is not visible yet. 
And the rdd
+      // block will be marked as visible when the corresponding task finished 
successfully.
+      context.reply(updateRDDBlockTaskInfo(blockId, taskId))
+
+    case GetRDDBlockVisibility(blockId) =>
+      // Get the visibility status of a specific rdd block.
+      context.reply(isRDDBlockVisible(blockId))
+
+    case UpdateRDDBlockVisibility(taskId, visible) =>
+      // This is to report the information that whether rdd blocks computed by 
task(with `taskId`)
+      // can be turned to be visible. This is reported by DAGScheduler right 
after task completes.
+      // If the task finished successfully, rdd blocks can be turned to be 
visible, otherwise rdd
+      // blocks' visibility status won't change.
+      context.reply(updateRDDBlockVisibility(taskId, visible))
+  }
+
+  private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+    if (trackingCacheVisibility) {
+      blockLocations.containsKey(blockId) &&
+        blockLocations.get(blockId).nonEmpty && 
!invisibleRDDBlocks.contains(blockId)
+    } else {
+      // Blocks should always be visible if the feature flag is disabled.
+      true
+    }
+  }
+
+  private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit = 
{
+    if (!trackingCacheVisibility) {
+      // Do nothing if the feature flag is disabled.
+      return
+    }
+
+    if (visible) {
+      tidToRddBlockIds.get(taskId).foreach { blockIds =>
+        blockIds.foreach { blockId =>
+          invisibleRDDBlocks.remove(blockId)
+          // Ask block managers to update the visibility status.
+          val msg = MarkRDDBlockAsVisible(blockId)
+          getLocations(blockId).flatMap(blockManagerInfo.get).foreach { 
managerInfo =>
+            managerInfo.storageEndpoint.ask[Unit](msg)
+          }
+        }
+      }
+    }

Review Comment:
   Updated, and this is the jira: 
https://issues.apache.org/jira/browse/SPARK-42582



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to