mridulm commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1117987993
##########
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##########
@@ -180,6 +186,27 @@ private[storage] class BlockInfoManager extends Logging {
//
----------------------------------------------------------------------------------------------
+ private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+ if (trackingCacheVisibility) {
+ invisibleRDDBlocks.synchronized {
+ blockInfoWrappers.containsKey(blockId) &&
!invisibleRDDBlocks.contains(blockId)
+ }
+ } else {
+ // Always be visible if the feature flag is disabled.
+ true
+ }
+ }
+
+ private[spark] def tryMarkBlockAsVisible(blockId: RDDBlockId): Unit = {
+ if (trackingCacheVisibility) {
+ invisibleRDDBlocks.synchronized {
+ if (blockInfoWrappers.containsKey(blockId)) {
+ invisibleRDDBlocks.remove(blockId)
Review Comment:
We dont need the `containsKey` check ...
##########
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##########
@@ -150,6 +150,12 @@ private[storage] class BlockInfoManager extends Logging {
*/
private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId,
BlockInfoWrapper]
+ /**
+ * Record invisible rdd blocks stored in the block manager, entries will be
removed when blocks
+ * are marked as visible or blocks are removed by [[removeBlock()]].
+ */
+ private[spark] val invisibleRDDBlocks =
ConcurrentHashMap.newKeySet[RDDBlockId]
Review Comment:
All accesses and updates to `invisibleRDDBlocks` are within the
`synchronized` block - make it a regular `HashSet` instead ?
Also, let us make it `private[this]`.
For the test cases which are directly accessing variable right now, we can
add a
```
private[storage] def containsInvisibleRDDBlock(blockId: RDDBlockId):
Boolean = {
invisibleRDDBlocks.synchronized {
invisibleRDDBlocks.contains(blockId)
}
}
```
##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -1325,31 +1328,74 @@ private[spark] class BlockManager(
blockInfoManager.releaseAllLocksForTask(taskAttemptId)
}
+ /**
+ * Retrieve the given rdd block if it exists and is visible, otherwise call
the provided
+ * `makeIterator` method to compute the block, persist it, and return its
values.
+ *
+ * @return either a BlockResult if the block was successfully cached, or an
iterator if the block
+ * could not be cached.
+ */
+ def getOrElseUpdateRDDBlock[T](
+ taskId: Long,
+ blockId: RDDBlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+ val isCacheVisible = isRDDBlockVisible(blockId)
+ val res = getOrElseUpdate(blockId, level, classTag, makeIterator,
isCacheVisible)
+ if (res.isLeft && !isCacheVisible) {
+ // Block exists but not visible, report taskId -> blockId info to master.
+ master.updateRDDBlockTaskInfo(blockId, taskId)
+ }
+
+ res
+ }
+
/**
* Retrieve the given block if it exists, otherwise call the provided
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an
iterator if the block
* could not be cached.
*/
- def getOrElseUpdate[T](
+ private def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
- makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
- // Attempt to read the block from local or remote storage. If it's
present, then we don't need
- // to go through the local-get-or-put path.
- get[T](blockId)(classTag) match {
- case Some(block) =>
- return Left(block)
- case _ =>
- // Need to compute the block.
+ makeIterator: () => Iterator[T],
+ isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = {
Review Comment:
Remove default value for `isCacheVisible`
##########
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##########
@@ -2266,6 +2270,160 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
}
}
+ test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache
visibility statue") {
+ val store = makeBlockManager(8000, "executor1")
+ val blockId = RDDBlockId(rddId = 1, splitIndex = 1)
+ var computed: Boolean = false
+ val data = Seq(1, 2, 3)
+ val makeIterator = () => {
+ computed = true
+ data.iterator
+ }
+
+ // Cache doesn't exist and is not visible.
+ assert(store.getStatus(blockId).isEmpty &&
!store.isRDDBlockVisible(blockId))
+ val res1 = store.getOrElseUpdateRDDBlock(
+ 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+ // Put cache successfully and reported block task info.
+ assert(res1.isLeft && computed)
+ verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1)
+
+ // Cache exists but not visible.
+ computed = false
+ assert(store.getStatus(blockId).nonEmpty &&
!store.isRDDBlockVisible(blockId))
+ val res2 = store.getOrElseUpdateRDDBlock(
+ 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+ // Load cache successfully and reported block task info.
+ assert(res2.isLeft && computed)
+ verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1)
Review Comment:
Note:
This specific check is not very accurate ... it is the current code's
implementation detail. and not the behavior we want to test for. (I am fine
with leaving this check here for now though !)
For example, we can skip reporting to master if the taskId has computed a
particular block already (especially if it is in the invisible set). In actual
spark usage, this cannot happen - and so this is an artifact of test cases IMO.
A better check would be to test for visibility is false, and computed ==
true (which we do already) - which is the behavior we want to test for.
##########
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##########
@@ -139,7 +139,7 @@ private[storage] object BlockInfo {
*
* This class is thread-safe.
*/
-private[storage] class BlockInfoManager extends Logging {
+private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean =
true) extends Logging {
Review Comment:
Make it `false` by default ?
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2468,4 +2468,15 @@ package object config {
.version("3.4.0")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+ .internal()
+ .doc("Set to be true to enabled RDD cache block's visibility status.
Once it's enabled," +
+ " a RDD cache block can be used only when it's marked as visible. And
a RDD block will be" +
+ " marked as visible only when one of the tasks generating the cache
block finished" +
+ " successfully. This is relevant in context of consistent accumulator
status.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
Review Comment:
Let us mark it `false` by default ?
Anyway we have to revisit this when fixing the general data correctness
issue, with persisted blocks, we identified as part of this review.
##########
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##########
@@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging {
try {
val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
while (true) {
- val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+ val previous = invisibleRDDBlocks.synchronized {
Review Comment:
Let us if/else around `trackingCacheVisibility`.
```
if (trackingCacheVisibility) {
// existing code in this PR
} else {
blockInfoWrappers.putIfAbsent(blockId, wrapper)
}
```
##########
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##########
@@ -525,6 +562,7 @@ private[storage] class BlockInfoManager extends Logging {
blockInfoWrappers.clear()
readLocksByTask.clear()
writeLocksByTask.clear()
+ invisibleRDDBlocks.clear()
Review Comment:
Note: when changing type of `invisibleRDDBlocks` to `HashSet`, this needs to
be within `invisibleRDDBlocks.synchronized` as well
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint(
case StopBlockManagerMaster =>
context.reply(true)
stop()
+
+ case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+ // This is to report the information that a rdd block(with `blockId`) is
computed
+ // and cached by task(with `taskId`). And this happens right after the
task finished
+ // computing/caching the block only when the block is not visible yet.
And the rdd
+ // block will be marked as visible when the corresponding task finished
successfully.
+ context.reply(updateRDDBlockTaskInfo(blockId, taskId))
+
+ case GetRDDBlockVisibility(blockId) =>
+ // Get the visibility status of a specific rdd block.
+ context.reply(isRDDBlockVisible(blockId))
+
+ case UpdateRDDBlockVisibility(taskId, visible) =>
+ // This is to report the information that whether rdd blocks computed by
task(with `taskId`)
+ // can be turned to be visible. This is reported by DAGScheduler right
after task completes.
+ // If the task finished successfully, rdd blocks can be turned to be
visible, otherwise rdd
+ // blocks' visibility status won't change.
+ context.reply(updateRDDBlockVisibility(taskId, visible))
+ }
+
+ private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+ if (trackingCacheVisibility) {
+ blockLocations.containsKey(blockId) &&
+ blockLocations.get(blockId).nonEmpty &&
!invisibleRDDBlocks.contains(blockId)
+ } else {
+ // Blocks should always be visible if the feature flag is disabled.
+ true
Review Comment:
This should not happen iirc (here and in the other methods added in this
class) ... could be a precondition check.
But it is fine to include it for now.
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint(
case StopBlockManagerMaster =>
context.reply(true)
stop()
+
+ case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+ // This is to report the information that a rdd block(with `blockId`) is
computed
+ // and cached by task(with `taskId`). And this happens right after the
task finished
+ // computing/caching the block only when the block is not visible yet.
And the rdd
+ // block will be marked as visible when the corresponding task finished
successfully.
+ context.reply(updateRDDBlockTaskInfo(blockId, taskId))
+
+ case GetRDDBlockVisibility(blockId) =>
+ // Get the visibility status of a specific rdd block.
+ context.reply(isRDDBlockVisible(blockId))
+
+ case UpdateRDDBlockVisibility(taskId, visible) =>
+ // This is to report the information that whether rdd blocks computed by
task(with `taskId`)
+ // can be turned to be visible. This is reported by DAGScheduler right
after task completes.
+ // If the task finished successfully, rdd blocks can be turned to be
visible, otherwise rdd
+ // blocks' visibility status won't change.
+ context.reply(updateRDDBlockVisibility(taskId, visible))
+ }
+
+ private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+ if (trackingCacheVisibility) {
+ blockLocations.containsKey(blockId) &&
+ blockLocations.get(blockId).nonEmpty &&
!invisibleRDDBlocks.contains(blockId)
+ } else {
+ // Blocks should always be visible if the feature flag is disabled.
+ true
+ }
+ }
+
+ private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit =
{
+ if (!trackingCacheVisibility) {
+ // Do nothing if the feature flag is disabled.
+ return
+ }
+
+ if (visible) {
+ tidToRddBlockIds.get(taskId).foreach { blockIds =>
+ blockIds.foreach { blockId =>
+ invisibleRDDBlocks.remove(blockId)
+ // Ask block managers to update the visibility status.
+ val msg = MarkRDDBlockAsVisible(blockId)
+ getLocations(blockId).flatMap(blockManagerInfo.get).foreach {
managerInfo =>
+ managerInfo.storageEndpoint.ask[Unit](msg)
+ }
+ }
+ }
+ }
Review Comment:
Note:
When `visible == false`, the task had failed - and we should be asking the
block managers to evict the block.
This can come later, as part of the correctness fix though.
Can you add a TODO for it ? Please include reference to the correctness jira
(for persisted block issue we discussed) in the comment as well. Thx
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -728,7 +800,22 @@ class BlockManagerMasterEndpoint(
}
if (storageLevel.isValid) {
+ val firstBlock = locations.isEmpty
locations.add(blockManagerId)
+
+ blockId.asRDDId.foreach { rddBlockId =>
+ (trackingCacheVisibility, firstBlock) match {
+ case (true, true) =>
+ // Mark as invisible for the first block.
+ invisibleRDDBlocks.add(rddBlockId)
+ case (true, false) if !invisibleRDDBlocks.contains(rddBlockId) =>
+ // If the rdd block is already visible, ask storage manager to
update the visibility
+ // status.
+ blockManagerInfo(blockManagerId).storageEndpoint
+ .ask[Unit](MarkRDDBlockAsVisible(rddBlockId))
Review Comment:
To understand better, this is to handle case where block was replicated to
another node - but in meantime the original task completed successfully (and so
not invisible anymore).
So ensure that the replica is also marked as visible, right ?
Any other scenario where this is relevant ?
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala:
##########
@@ -81,6 +81,8 @@ class BlockManagerStorageEndpoint(
case ReplicateBlock(blockId, replicas, maxReplicas) =>
context.reply(blockManager.replicateBlock(blockId, replicas.toSet,
maxReplicas))
+ case MarkRDDBlockAsVisible(blockId) =>
Review Comment:
Can you add a comment for this as well ? Similar to
`UpdateRDDBlockTaskInfo`, etc in `BlockManagerMasterEndpoint`
##########
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##########
@@ -2266,6 +2270,160 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
}
}
+ test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache
visibility statue") {
+ val store = makeBlockManager(8000, "executor1")
+ val blockId = RDDBlockId(rddId = 1, splitIndex = 1)
+ var computed: Boolean = false
+ val data = Seq(1, 2, 3)
+ val makeIterator = () => {
+ computed = true
+ data.iterator
+ }
+
+ // Cache doesn't exist and is not visible.
+ assert(store.getStatus(blockId).isEmpty &&
!store.isRDDBlockVisible(blockId))
+ val res1 = store.getOrElseUpdateRDDBlock(
+ 1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+ // Put cache successfully and reported block task info.
+ assert(res1.isLeft && computed)
+ verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1)
+
+ // Cache exists but not visible.
+ computed = false
+ assert(store.getStatus(blockId).nonEmpty &&
!store.isRDDBlockVisible(blockId))
Review Comment:
Let us move this assertion to right after `verify` above ?
Essentially, that is the last check for the "Cache doesn't exist and is not
visible." case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]