[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1125602608


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully. This is relevant in context of consistent accumulator 
status.")
+  .version("3.4.0")

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1122750718


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,31 +1328,74 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+val res = getOrElseUpdate(blockId, level, classTag, makeIterator, 
isCacheVisible)
+if (res.isLeft && !isCacheVisible) {
+  // Block exists but not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private def getOrElseUpdate[T](
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[T],
-  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
-// Attempt to read the block from local or remote storage. If it's 
present, then we don't need
-// to go through the local-get-or-put path.
-get[T](blockId)(classTag) match {
-  case Some(block) =>
-return Left(block)
-  case _ =>
-// Need to compute the block.
+  makeIterator: () => Iterator[T],
+  isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = {

Review Comment:
   done, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1122750441


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging {
 try {
   val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
   while (true) {
-val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+val previous = invisibleRDDBlocks.synchronized {

Review Comment:
   done, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028518


##
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##
@@ -2266,6 +2270,160 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
 }
   }
 
+  test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache 
visibility statue") {
+val store = makeBlockManager(8000, "executor1")
+val blockId = RDDBlockId(rddId = 1, splitIndex = 1)
+var computed: Boolean = false
+val data = Seq(1, 2, 3)
+val makeIterator = () => {
+  computed = true
+  data.iterator
+}
+
+// Cache doesn't exist and is not visible.
+assert(store.getStatus(blockId).isEmpty && 
!store.isRDDBlockVisible(blockId))
+val res1 = store.getOrElseUpdateRDDBlock(
+  1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+// Put cache successfully and reported block task info.
+assert(res1.isLeft && computed)
+verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1)
+
+// Cache exists but not visible.
+computed = false
+assert(store.getStatus(blockId).nonEmpty && 
!store.isRDDBlockVisible(blockId))
+val res2 = store.getOrElseUpdateRDDBlock(
+  1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+// Load cache successfully and reported block task info.
+assert(res2.isLeft && computed)
+verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1)

Review Comment:
   HI @mridulm actually this is to make sure that task2blockId information will 
be reported only when the block is invisble as this is part of the work flow 
design. If this doesn't make sense, I can remove `verify` statement in a new 
iteration. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028223


##
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala:
##
@@ -81,6 +81,8 @@ class BlockManagerStorageEndpoint(
 case ReplicateBlock(blockId, replicas, maxReplicas) =>
   context.reply(blockManager.replicateBlock(blockId, replicas.toSet, 
maxReplicas))
 
+case MarkRDDBlockAsVisible(blockId) =>

Review Comment:
   Of course. Updated.



##
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##
@@ -2266,6 +2270,160 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
 }
   }
 
+  test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache 
visibility statue") {
+val store = makeBlockManager(8000, "executor1")
+val blockId = RDDBlockId(rddId = 1, splitIndex = 1)
+var computed: Boolean = false
+val data = Seq(1, 2, 3)
+val makeIterator = () => {
+  computed = true
+  data.iterator
+}
+
+// Cache doesn't exist and is not visible.
+assert(store.getStatus(blockId).isEmpty && 
!store.isRDDBlockVisible(blockId))
+val res1 = store.getOrElseUpdateRDDBlock(
+  1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+// Put cache successfully and reported block task info.
+assert(res1.isLeft && computed)
+verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1)
+
+// Cache exists but not visible.
+computed = false
+assert(store.getStatus(blockId).nonEmpty && 
!store.isRDDBlockVisible(blockId))

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028179


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -210,6 +220,65 @@ class BlockManagerMasterEndpoint(
 case StopBlockManagerMaster =>
   context.reply(true)
   stop()
+
+case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+  // This is to report the information that a rdd block(with `blockId`) is 
computed
+  // and cached by task(with `taskId`). And this happens right after the 
task finished
+  // computing/caching the block only when the block is not visible yet. 
And the rdd
+  // block will be marked as visible when the corresponding task finished 
successfully.
+  context.reply(updateRDDBlockTaskInfo(blockId, taskId))
+
+case GetRDDBlockVisibility(blockId) =>
+  // Get the visibility status of a specific rdd block.
+  context.reply(isRDDBlockVisible(blockId))
+
+case UpdateRDDBlockVisibility(taskId, visible) =>
+  // This is to report the information that whether rdd blocks computed by 
task(with `taskId`)
+  // can be turned to be visible. This is reported by DAGScheduler right 
after task completes.
+  // If the task finished successfully, rdd blocks can be turned to be 
visible, otherwise rdd
+  // blocks' visibility status won't change.
+  context.reply(updateRDDBlockVisibility(taskId, visible))
+  }
+
+  private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+if (trackingCacheVisibility) {
+  blockLocations.containsKey(blockId) &&
+blockLocations.get(blockId).nonEmpty && 
!invisibleRDDBlocks.contains(blockId)
+} else {
+  // Blocks should always be visible if the feature flag is disabled.
+  true
+}
+  }
+
+  private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit = 
{
+if (!trackingCacheVisibility) {
+  // Do nothing if the feature flag is disabled.
+  return
+}
+
+if (visible) {
+  tidToRddBlockIds.get(taskId).foreach { blockIds =>
+blockIds.foreach { blockId =>
+  invisibleRDDBlocks.remove(blockId)
+  // Ask block managers to update the visibility status.
+  val msg = MarkRDDBlockAsVisible(blockId)
+  getLocations(blockId).flatMap(blockManagerInfo.get).foreach { 
managerInfo =>
+managerInfo.storageEndpoint.ask[Unit](msg)
+  }
+}
+  }
+}

Review Comment:
   Updated, and this is the jira: 
https://issues.apache.org/jira/browse/SPARK-42582



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028046


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -525,6 +562,7 @@ private[storage] class BlockInfoManager extends Logging {
 blockInfoWrappers.clear()
 readLocksByTask.clear()
 writeLocksByTask.clear()
+invisibleRDDBlocks.clear()

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028028


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -150,6 +150,12 @@ private[storage] class BlockInfoManager extends Logging {
*/
   private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, 
BlockInfoWrapper]
 
+  /**
+   * Record invisible rdd blocks stored in the block manager, entries will be 
removed when blocks
+   * are marked as visible or blocks are removed by [[removeBlock()]].
+   */
+  private[spark] val invisibleRDDBlocks = 
ConcurrentHashMap.newKeySet[RDDBlockId]

Review Comment:
   Make sense. Thanks, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118028001


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -180,6 +186,27 @@ private[storage] class BlockInfoManager extends Logging {
 
   // 
--
 
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+if (trackingCacheVisibility) {
+  invisibleRDDBlocks.synchronized {
+blockInfoWrappers.containsKey(blockId) && 
!invisibleRDDBlocks.contains(blockId)
+  }
+} else {
+  // Always be visible if the feature flag is disabled.
+  true
+}
+  }
+
+  private[spark] def tryMarkBlockAsVisible(blockId: RDDBlockId): Unit = {
+if (trackingCacheVisibility) {
+  invisibleRDDBlocks.synchronized {
+if (blockInfoWrappers.containsKey(blockId)) {
+  invisibleRDDBlocks.remove(blockId)

Review Comment:
   Thanks, you are right. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118027972


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully. This is relevant in context of consistent accumulator 
status.")
+  .version("3.4.0")
+  .booleanConf
+  .createWithDefault(true)

Review Comment:
   Sounds good. Thanks, updated.



##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -139,7 +139,7 @@ private[storage] object BlockInfo {
  *
  * This class is thread-safe.
  */
-private[storage] class BlockInfoManager extends Logging {
+private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = 
true) extends Logging {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118013777


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -728,7 +800,22 @@ class BlockManagerMasterEndpoint(
 }
 
 if (storageLevel.isValid) {
+  val firstBlock = locations.isEmpty
   locations.add(blockManagerId)
+
+  blockId.asRDDId.foreach { rddBlockId =>
+(trackingCacheVisibility, firstBlock) match {
+  case (true, true) =>
+// Mark as invisible for the first block.
+invisibleRDDBlocks.add(rddBlockId)
+  case (true, false) if !invisibleRDDBlocks.contains(rddBlockId) =>
+// If the rdd block is already visible, ask storage manager to 
update the visibility
+// status.
+blockManagerInfo(blockManagerId).storageEndpoint
+  .ask[Unit](MarkRDDBlockAsVisible(rddBlockId))

Review Comment:
   > So ensure that the replica is also marked as visible, right ?
   
   Yes, this is one scenario. Another scenario is that once a task failed 
fetching the cached block from remote executor, it'll compute and cache the 
block again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-25 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1118005755


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging {
 try {
   val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
   while (true) {
-val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+val previous = invisibleRDDBlocks.synchronized {
+  val res = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+  if (res == null && trackingCacheVisibility) {
+// Added to invisible blocks if it doesn't exist before.
+blockId.asRDDId.foreach(invisibleRDDBlocks.add)

Review Comment:
   Currently we syncrhonized all the write operations, and also the read 
operations which need to check both the variables. For other read operations, 
since the variabled won't change, I think synchronized block for such read 
operations may be not necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-21 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1113188852


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -502,7 +536,10 @@ private[storage] class BlockInfoManager extends Logging {
 throw new IllegalStateException(
   s"Task $taskAttemptId called remove() on block $blockId without a 
write lock")
   } else {
-blockInfoWrappers.remove(blockId)
+invisibleRDDBlocks.synchronized {
+  blockInfoWrappers.remove(blockId)

Review Comment:
   Just want to make sure that state change for both `blockInfoWrappers` and 
`invisibleRDDBlocks` are processed in the same syncrhonized block to avoid 
potential concurrent issues since we depends on the state of the 2 variables to 
decide whether a block is visible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-21 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1113157159


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging {
 try {
   val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
   while (true) {
-val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+val previous = invisibleRDDBlocks.synchronized {
+  val res = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+  if (res == null && trackingCacheVisibility) {
+// Added to invisible blocks if it doesn't exist before.
+blockId.asRDDId.foreach(invisibleRDDBlocks.add)

Review Comment:
   Currently the way we check whether a block is visible is to check the 
information from both `blockInfoWrappers` and `invisibleRDDBlocks`, so we will 
need the synchronized block to modify the state to avoid concurrent issues 
(`invisibleRDDBlocks` should be updated once new item put into 
`blockInfoWrappers` within the synchronized block). It will be easier to have 
the syncrhonized block here.
   
   ```
 private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
   if (trackingCacheVisibility) {
 invisibleRDDBlocks.synchronized {
   blockInfoWrappers.containsKey(blockId) && 
!invisibleRDDBlocks.contains(blockId)
 }
   } else {
 // Always be visible if the feature flag is disabled.
 true
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-20 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1112123963


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -150,6 +150,12 @@ private[storage] class BlockInfoManager extends Logging {
*/
   private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, 
BlockInfoWrapper]
 
+  /**
+   * Record visible rdd blocks stored in the block manager, entries will be 
removed
+   * by [[removeBlock()]]
+   */
+  private[spark] val visibleRDDBlocks = ConcurrentHashMap.newKeySet[RDDBlockId]

Review Comment:
   Sounds good, will make the change. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-20 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1112123512


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +78,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.

Review Comment:
   Thanks, fixed the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-20 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1112123108


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -210,6 +219,65 @@ class BlockManagerMasterEndpoint(
 case StopBlockManagerMaster =>
   context.reply(true)
   stop()
+
+case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+  // This is to report the information that a rdd block(with `blockId`) is 
computed
+  // and cached by task(with `taskId`). The happens right after the task 
finished
+  // computing/caching the block only when the block is not visible yet. 
And the rdd
+  // block will be marked as visible only when the corresponding task 
finished successfully.
+  context.reply(updateRDDBlockTaskInfo(blockId, taskId))
+
+case GetRDDBlockVisibility(blockId) =>
+  // Get the visibility status of a specific rdd block.
+  if (!trackingCacheVisibility) {

Review Comment:
   Sure, updated.



##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -210,6 +219,65 @@ class BlockManagerMasterEndpoint(
 case StopBlockManagerMaster =>
   context.reply(true)
   stop()
+
+case UpdateRDDBlockTaskInfo(blockId, taskId) =>
+  // This is to report the information that a rdd block(with `blockId`) is 
computed
+  // and cached by task(with `taskId`). The happens right after the task 
finished

Review Comment:
   Thanks, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-20 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1112122792


##
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##
@@ -2266,6 +2270,150 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
 }
   }
 
+  test("SPARK-41497: getOrElseUpdateRDDBlock do compute based on cache 
visibility statue") {
+val store = makeBlockManager(8000, "executor1")
+val blockId = RDDBlockId(rddId = 1, splitIndex = 1)
+var computed: Boolean = false
+val data = Seq(1, 2, 3)
+val makeIterator = () => {
+  computed = true
+  data.iterator
+}
+
+// Cache doesn't exist and is not visible.
+assert(store.getStatus(blockId).isEmpty && 
!store.isRDDBlockVisible(blockId))
+val res1 = store.getOrElseUpdateRDDBlock(
+  1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+// Put cache successfully and reported block task info.
+assert(res1.isLeft && computed)
+verify(master, times(1)).updateRDDBlockTaskInfo(blockId, 1)
+
+// Cache exists but not visible.
+computed = false
+assert(store.getStatus(blockId).nonEmpty && 
!store.isRDDBlockVisible(blockId))
+val res2 = store.getOrElseUpdateRDDBlock(
+  1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+// Load cache successfully and reported block task info.
+assert(res2.isLeft && computed)
+verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1)
+
+// Cache exists and visible.
+store.blockInfoManager.tryAddVisibleBlock(blockId)
+computed = false
+assert(store.getStatus(blockId).nonEmpty && 
store.isRDDBlockVisible(blockId))
+val res3 = store.getOrElseUpdateRDDBlock(
+  1, blockId, StorageLevel.MEMORY_ONLY, classTag[Int], makeIterator)
+// Load cache successfully but not report block task info.
+assert(res3.isLeft && !computed)
+verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1)
+  }
+
+  test("add block rdd visibility status") {

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-20 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1112113795


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1470,28 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// Cached blocks are always visible if the feature flag is disabled.
+if (!trackingCacheVisibility) {
+  return true
+}
+
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+
+if(master.isRDDBlockVisible(blockId)) {
+  // Cache the visibility status if block exists.
+  blockInfoManager.tryAddVisibleBlock(blockId)
+  true

Review Comment:
   Yes, I think so. Even though current executor doesn't have the cached block, 
we still can read the cache from a remote executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-19 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r227309


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully.")

Review Comment:
   Thanks, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-19 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r226606


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {
+  if (!computed) {
+// Loaded from cache, re-compute to update accumulators.
+makeIterator()
+  }

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-19 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r226580


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   Updated, please take a look. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-19 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r226512


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]

Review Comment:
   Updated. Please take a look. Thanks. cc @Ngone51  @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-19 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r226349


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,31 +1328,71 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+val res = getOrElseUpdate(blockId, level, classTag, makeIterator, 
isCacheVisible)
+if (res.isLeft && !isCacheVisible) {
+  // Block exists but not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private def getOrElseUpdate[T](
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[T],
-  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
-// Attempt to read the block from local or remote storage. If it's 
present, then we don't need
-// to go through the local-get-or-put path.
-get[T](blockId)(classTag) match {
-  case Some(block) =>
-return Left(block)
-  case _ =>
-// Need to compute the block.
+  makeIterator: () => Iterator[T],
+  isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = {
+// Track whether the data is computed or not, force to do the computation 
later if need to.
+// The reason we push the force computing later is that once the executor 
is decommissioned we
+// will have a better chance to replicate the cache block because of the 
`checkShouldStore`
+// validation when putting a new block.
+var computed: Boolean = false
+val iterator = () => {
+  computed = true
+  makeIterator()
+}
+if (isCacheVisible) {
+  // Attempt to read the block from local or remote storage. If it's 
present, then we don't need
+  // to go through the local-get-or-put path.
+  get[T](blockId)(classTag) match {
+case Some(block) =>
+  return Left(block)
+case _ =>
+  // Need to compute the block.
+  }
 }
+
 // Initially we hold no locks on this block.
-doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) 
match {
+doPutIterator(blockId, iterator, level, classTag, keepReadLock = true) 
match {
   case None =>
 // doPut() didn't hand work back to us, so the block already existed 
or was successfully
 // stored. Therefore, we now hold a read lock on the block.
+if (!isCacheVisible && !computed) {
+  // Force compute to report accumulator updates.
+  makeIterator()

Review Comment:
   Thanks for pointing this out. Updated and added a UT for this.



##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1787,6 +1792,12 @@ private[spark] class DAGScheduler(
   case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
   case _ =>
 }
+if (trackingCacheVisibility) {
+  // Update rdd blocks' visibility status.
+  blockManagerMaster.updateRDDBlockVisibility(

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-19 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r226310


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -210,6 +219,51 @@ class BlockManagerMasterEndpoint(
 case StopBlockManagerMaster =>
   context.reply(true)
   stop()
+

Review Comment:
   Added some comments here, please take a look. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-17 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1110773664


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,31 +1328,71 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+val res = getOrElseUpdate(blockId, level, classTag, makeIterator, 
isCacheVisible)
+if (res.isLeft && !isCacheVisible) {
+  // Block exists but not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private def getOrElseUpdate[T](
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[T],
-  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
-// Attempt to read the block from local or remote storage. If it's 
present, then we don't need
-// to go through the local-get-or-put path.
-get[T](blockId)(classTag) match {
-  case Some(block) =>
-return Left(block)
-  case _ =>
-// Need to compute the block.
+  makeIterator: () => Iterator[T],
+  isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = {
+// Track whether the data is computed or not, force to do the computation 
later if need to.
+// The reason we push the force computing later is that once the executor 
is decommissioned we
+// will have a better chance to replicate the cache block because of the 
`checkShouldStore`
+// validation when putting a new block.
+var computed: Boolean = false
+val iterator = () => {
+  computed = true
+  makeIterator()
+}
+if (isCacheVisible) {
+  // Attempt to read the block from local or remote storage. If it's 
present, then we don't need
+  // to go through the local-get-or-put path.
+  get[T](blockId)(classTag) match {
+case Some(block) =>
+  return Left(block)
+case _ =>
+  // Need to compute the block.
+  }
 }
+
 // Initially we hold no locks on this block.
-doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) 
match {
+doPutIterator(blockId, iterator, level, classTag, keepReadLock = true) 
match {
   case None =>
 // doPut() didn't hand work back to us, so the block already existed 
or was successfully
 // stored. Therefore, we now hold a read lock on the block.
+if (!isCacheVisible && !computed) {
+  // Force compute to report accumulator updates.

Review Comment:
   Yes, the recomputation is only for updating accumulators. The reulst should 
be the same unless the result is indeterminate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-15 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1107261503


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   > You are right, the current PR is handling it on a second read ... Since we 
are already checking for blockInfoManager.isRDDBlockVisible(blockId) first.
   This should cover the case of (1) - and we will always query in case block 
is available, and we have to distinguish (2).
   (2.1) would be an optimization we can attempt later on.
   
   Thanks @mridulm . Just want to clarity that are you suggesting that we can 
also cache the state got from driver when it's already visible?
   I think we can update the cache state if the block exists in the executor 
after getting the results from driver/master.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-15 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1107228885


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1787,6 +1792,12 @@ private[spark] class DAGScheduler(
   case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
   case _ =>
 }
+if (trackingCacheVisibility) {
+  // Update rdd blocks' visibility status.
+  blockManagerMaster.updateRDDBlockVisibility(

Review Comment:
   Yes, async call should be fine. Will make the change, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-15 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1107226569


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,31 +1328,71 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+val res = getOrElseUpdate(blockId, level, classTag, makeIterator, 
isCacheVisible)
+if (res.isLeft && !isCacheVisible) {
+  // Block exists but not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private def getOrElseUpdate[T](
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[T],
-  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
-// Attempt to read the block from local or remote storage. If it's 
present, then we don't need
-// to go through the local-get-or-put path.
-get[T](blockId)(classTag) match {
-  case Some(block) =>
-return Left(block)
-  case _ =>
-// Need to compute the block.
+  makeIterator: () => Iterator[T],
+  isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = {
+// Track whether the data is computed or not, force to do the computation 
later if need to.
+// The reason we push the force computing later is that once the executor 
is decommissioned we
+// will have a better chance to replicate the cache block because of the 
`checkShouldStore`
+// validation when putting a new block.
+var computed: Boolean = false
+val iterator = () => {
+  computed = true
+  makeIterator()
+}
+if (isCacheVisible) {
+  // Attempt to read the block from local or remote storage. If it's 
present, then we don't need
+  // to go through the local-get-or-put path.
+  get[T](blockId)(classTag) match {
+case Some(block) =>
+  return Left(block)
+case _ =>
+  // Need to compute the block.
+  }
 }
+
 // Initially we hold no locks on this block.
-doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) 
match {
+doPutIterator(blockId, iterator, level, classTag, keepReadLock = true) 
match {
   case None =>
 // doPut() didn't hand work back to us, so the block already existed 
or was successfully
 // stored. Therefore, we now hold a read lock on the block.
+if (!isCacheVisible && !computed) {
+  // Force compute to report accumulator updates.

Review Comment:
   Yes, if the block already exists in the node, here just computed it again 
but not putting the result.
   
   > Could we force put the result iterator even if the block exists in this 
case?
   This will need to modify the locking mechanism a little bit. This is also a 
issue about indeterminate operation, for derterminate operations no need to 
replace the cache here.  I am wondering can the inderterminate framework cover 
this  case with some other solutions?
   



##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,31 +1328,71 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+val res = getOrElseUpdate(blockId, level, classTag, makeIterator, 
isCacheVisible)
+if (res.isLeft && !isCacheVisible) {
+  // Block exists but not visible, 

[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-15 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1107215072


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]

Review Comment:
   Will try to make the change, the scenario I mentioned above would not be a 
common case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-12 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1103799711


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   Hi @mridulm, in current implementation, once a block turns to be visible, 
driver would send a broadcast message to executors having the cached block data 
stored to mark the block as visible.
   
   The state `visibleRDDBlocks` is cached in 
[BlockerInfoManager](https://github.com/apache/spark/pull/39459/files#diff-fdee2ef66ad5bea5323506395b453145c74f47c8da092dcacd34a66190a20a15).
 It is kind of cached visiblity state in executor side but only in executors 
which have the cached block stored. This is done in a push-based update style.
   
   With above mechanism, do you think we still need another cache to store the 
visiblity information in executor or do we also need to cache the state in 
executors not having the cached block data stored?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-12 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1103794439


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]

Review Comment:
   > If existing blocks are lost - why would you need that information as they 
are gone? In other words, how is it different from today's situation (without 
visibility) - if a block is lost, it is no longer in system.
   
   Here is an example for the scenario I am trying to describe:
   1. we have a cached block rdd_1_1 which has been successfully cached and 
marked as visible.
   2. the cached block got lost due to executor lost;
   3. another task on rdd1 got submitted and the 1st attempt failed after 
putting the cache block rdd_1_1, for the 2nd attempts, things could be 
different here:
   a. if we still have the visiblily status, the 2nd attempt can use the 
cached block directly;
   b otherwise, we still need to do the computing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-08 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1100333545


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]

Review Comment:
   Let me explain this further. If we track visible blocks, it's clear that we 
always know which blocks are visible.
   
   If we track invisible blocks, the way we consider a block as visible is that 
at least one block exists and it's not in invisible lists. So if the existing 
blocks got lost, we will lose the information. Next time the cache is 
re-computed, we will do this again(firstly put it into invisible lists, then 
promote it to visible by removing it from invisible list once task finished 
successfully). And after doing the process again, the cache would be visible 
then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-08 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1100268004


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   > > Once the rdd is removed, a broadcast message will be sent to each 
BlockManager to clean the cache.
   > 
   > I think the problem in this way is that it introduces the race condition 
between the visible cache and the broadcast message.
   > 
   > `master.isRDDBlockVisible(blockId)` shouldn't be frequently called, right? 
I thought `blockInfoManager.isRDDBlockVisible(blockId)` would be the most case 
for us, no?
   
   With locality scheduler I think `master.isRDDBlockVisible(blockId) ` should 
not be frequently called. If the assumption is true, then maybe we don't need 
to cache the results. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1096527082


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {
+  if (!computed) {
+// Loaded from cache, re-compute to update accumulators.
+makeIterator()
+  }
+  // Block exists and not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private[spark] def getOrElseUpdate[T](

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1096527017


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {

Review Comment:
   Updated.



##
core/src/test/scala/org/apache/spark/AccumulatorSuite.scala:
##
@@ -89,6 +89,38 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
 assert(AccumulatorContext.get(10).isEmpty)
   }
 
+
+  test("SPARK-41497: accumulators should be reported in the case of task retry 
with rdd cache") {
+// Set up a cluster with 2 executors
+val conf = new SparkConf()
+  .setMaster("local-cluster[2, 1, 
1024]").setAppName("TaskSchedulerImplSuite")

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1096526943


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {
+  if (!computed) {
+// Loaded from cache, re-compute to update accumulators.
+makeIterator()
+  }

Review Comment:
   > Right..in this case, the rdd block locations for different data can be 
attached to the same rdd block id. So the reader could get the different data 
for the same rdd block, which makes the rdd block data also indeterminate.
   
   @Ngone51 do you think shall we solve this issue in this PR? This looks like 
a more general issue about the indeterminate computation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1096526714


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   One way in my mind is that, we cache the results(for visible rdd blocks) in 
the block manager. Once the rdd is removed, a broadcast message will be sent to 
each BlockManager to clean the cache.
   
   I am wondering is it worth to do this? Since there is locality scheduling, 
if tasks got scheduled to the executor where cached block exists, there'll be 
no calls to master.
   
   Let me know your thoughts about this, thanks. cc @mridulm @Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-02-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1096520993


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]

Review Comment:
   Just found one problem if we track the invisible RDD blocks.
   
   If we track the invisible RDD blocks, then we would mark a RDD block as 
visible(cache can be used) only when it exists in `blockLocations` and not 
exists in `invisibleRDDBlocks`. When `blockLocations` removed the block(could 
be caused by executor lost), we will lose the information. Then the new cached 
data won't be leveraged as soon as possible(right after the cache is 
generated/reported to master).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090773361


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {
+  if (!computed) {
+// Loaded from cache, re-compute to update accumulators.
+makeIterator()
+  }

Review Comment:
   > Particularly for unordered or indeterminate computation, this can impact 
the accumulators generated - and it wont match the result iterator returned.
   
   There could be some other issues with such scenarios, like if we have blocks 
generated from different task attempts or cache re-compute, each cache may also 
have different data. Would this also be a problem? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090491731


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -77,6 +77,11 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
+  // Mapping from task id to the set of rdd blocks which are generated from 
the task.
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]
+  // Record the visible RDD blocks which have been generated at least from one 
successful task.
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]

Review Comment:
   That would be better. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090483687


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   Yes, I've thought about this. Then we will add a mechanism to clean the 
cache. Let me re-think about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090483687


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1424,6 +1457,16 @@ private[spark] class BlockManager(
 blockStoreUpdater.save()
   }
 
+  // Check whether a rdd block is visible or not.
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+// If the rdd block visibility information not available in the block 
manager,
+// asking master for the information.
+if (blockInfoManager.isRDDBlockVisible(blockId)) {
+  return true
+}
+master.isRDDBlockVisible(blockId)

Review Comment:
   Yes, I've thought about this. Then we will add a mechanism to clean the 
cache then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090482420


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {
+  if (!computed) {
+// Loaded from cache, re-compute to update accumulators.
+makeIterator()
+  }

Review Comment:
   Thanks, will make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090480637


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {
+  if (!computed) {
+// Loaded from cache, re-compute to update accumulators.
+makeIterator()
+  }
+  // Block exists and not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private[spark] def getOrElseUpdate[T](

Review Comment:
   Sounds more reasonable. Will make the change. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090479561


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -180,6 +186,14 @@ private[storage] class BlockInfoManager extends Logging {
 
   // 
--
 
+  private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
+blockInfoWrappers.containsKey(blockId) && 
visibleRDDBlocks.contains(blockId)
+  }

Review Comment:
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090478629


##
core/src/test/scala/org/apache/spark/AccumulatorSuite.scala:
##
@@ -89,6 +89,38 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
 assert(AccumulatorContext.get(10).isEmpty)
   }
 
+
+  test("SPARK-41497: accumulators should be reported in the case of task retry 
with rdd cache") {
+// Set up a cluster with 2 executors
+val conf = new SparkConf()
+  .setMaster("local-cluster[2, 1, 
1024]").setAppName("TaskSchedulerImplSuite")

Review Comment:
   Thanks, will update the app name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-30 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1090477930


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,14 +1325,47 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+var computed: Boolean = false
+val getIterator = () => {
+  computed = true
+  makeIterator()
+}
+
+val res = getOrElseUpdate(blockId, level, classTag, getIterator)
+if (res.isLeft && !isCacheVisible) {

Review Comment:
   Yes, that's true. Will make the change. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-15 Thread GitBox


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1070632750


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,6 +1325,64 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+var getIterator = makeIterator
+
+// Attempt to read the block from local or remote storage. If it's 
present, then we don't need
+// to go through the local-get-or-put path.
+if (master.isRDDBlockVisible(blockId)) { // Read from cache only when the 
block is visible.
+  get[T](blockId)(classTag) match {
+case Some(block) =>
+  return Left(block)
+case _ =>
+// Need to compute the block.
+  }
+  // Initially we hold no locks on this block.
+} else {
+  // Need to compute the block, since the block maybe already exists, force
+  // compute the block here.
+  val iterator = makeIterator()
+  getIterator = () => iterator
+}
+
+doPutIterator(blockId, getIterator, level, classTag, keepReadLock = true) 
match {
+  case None =>
+// Report taskId -> blockId relationship to master.
+master.updateRDDBlockTaskInfo(blockId, taskId)

Review Comment:
   In the latest implementation, the replication logic keeps the same as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-01-15 Thread GitBox


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1070632692


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -76,6 +76,8 @@ class BlockManagerMasterEndpoint(
 
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
+  private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId]
+  private val tidToRddBlockIds = new mutable.HashMap[Long, 
mutable.HashSet[RDDBlockId]]

Review Comment:
   Thanks, fixed. When task completes, will clean up the records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org