ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028179
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint(
case StopBlockManagerMaster =>
context.reply(true)
stop()
+
+ case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+ // This is to report the information that a rdd block(with `blockId`) is
computed
+ // and cached by task(with `taskId`). And this happens right after the
task finished
+ // computing/caching the block only when the block is not visible yet.
And the rdd
+ // block will be marked as visible when the corresponding task finished
successfully.
+ context.reply(updateRDDBlockTaskInfo(blockId, taskId))
+
+ case GetRDDBlockVisibility(blockId) =>
+ // Get the visibility status of a specific rdd block.
+ context.reply(isRDDBlockVisible(blockId))
+
+ case UpdateRDDBlockVisibility(taskId, visible) =>
+ // This is to report the information that whether rdd blocks computed by
task(with `taskId`)
+ // can be turned to be visible. This is reported by DAGScheduler right
after task completes.
+ // If the task finished successfully, rdd blocks can be turned to be
visible, otherwise rdd
+ // blocks' visibility status won't change.
+ context.reply(updateRDDBlockVisibility(taskId, visible))
+ }
+
+ private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+ if (trackingCacheVisibility) {
+ blockLocations.containsKey(blockId) &&
+ blockLocations.get(blockId).nonEmpty &&
!invisibleRDDBlocks.contains(blockId)
+ } else {
+ // Blocks should always be visible if the feature flag is disabled.
+ true
+ }
+ }
+
+ private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit =
{
+ if (!trackingCacheVisibility) {
+ // Do nothing if the feature flag is disabled.
+ return
+ }
+
+ if (visible) {
+ tidToRddBlockIds.get(taskId).foreach { blockIds =>
+ blockIds.foreach { blockId =>
+ invisibleRDDBlocks.remove(blockId)
+ // Ask block managers to update the visibility status.
+ val msg = MarkRDDBlockAsVisible(blockId)
+ getLocations(blockId).flatMap(blockManagerInfo.get).foreach {
managerInfo =>
+ managerInfo.storageEndpoint.ask[Unit](msg)
+ }
+ }
+ }
+ }
Review Comment:
Updated, and this is the jira:
https://issues.apache.org/jira/browse/SPARK-42582
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]