[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-13 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1070229836


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -366,8 +359,35 @@ class BlockManagerMasterEndpoint(
 }
   }.getOrElse(Seq.empty)
 
+val removeShuffleMergeFromShuffleServicesFutures =
+  externalBlockStoreClient.map { shuffleClient =>
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }
+mergerLocations.map { bmId =>
+  Future[Boolean] {
+shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
+  RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)
+  }
+}
+  }.getOrElse(Seq.empty)
+
+val removeMsg = RemoveShuffle(shuffleId)
+val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+// use false as default value means no shuffle data were removed
+handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+  }
+}.toSeq
+if (testing) {
+  
RpcUtils.INFINITE_TIMEOUT.awaitResult(Future.sequence(removeShuffleFromExecutorsFutures))

Review Comment:
   Sorry about this. Remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1069046358


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,23 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)
+  .toByteBuffer());
+  // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC.
+} catch (Exception e) {
+  logger.error("Exception while sending RemoveShuffleMerge request to 
{}:{}",

Review Comment:
   Do we need add an new RPC result type and add an callback here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1068910109


##
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala:
##
@@ -913,6 +918,59 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 slaveRpcEnv.shutdown()
   }
 
+  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
+val newConf = new SparkConf
+newConf.set("spark.shuffle.push.enabled", "true")
+newConf.set("spark.shuffle.service.enabled", "true")
+newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+newConf.set(IS_TESTING, true)
+
+val SHUFFLE_ID = 10
+// needs TorrentBroadcast so need a SparkContext
+withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { 
sc =>
+  val rpcEnv = sc.env.rpcEnv
+  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
+  val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
+  val blockManagerMasterEndpoint = new BlockManagerMasterEndpoint(
+rpcEnv,
+sc.isLocal,
+sc.conf,
+sc.listenerBus,
+Some(blockStoreClient),
+// We dont care about this ...
+new concurrent.TrieMap[BlockManagerId, BlockManagerInfo](),
+masterTracker,
+sc.env.shuffleManager,
+true
+  )
+  rpcEnv.stop(sc.env.blockManager.master.driverEndpoint)
+  sc.env.blockManager.master.driverEndpoint =
+rpcEnv.setupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,
+  blockManagerMasterEndpoint)
+
+  masterTracker.registerShuffle(SHUFFLE_ID, 10, 10)
+  val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", 
s"host-$x", x))
+  masterTracker.registerShufflePushMergerLocations(SHUFFLE_ID, mergerLocs)
+
+  
assert(masterTracker.getShufflePushMergerLocations(SHUFFLE_ID).map(_.host).toSet
 ==
+mergerLocs.map(_.host).toSet)
+
+  val foundHosts = JCollections.synchronizedSet(new JHashSet[String]())
+  when(blockStoreClient.removeShuffleMerge(any(), any(), any(), 
any())).thenAnswer(
+(m: InvocationOnMock) => {
+  val host = m.getArgument(0).asInstanceOf[String]
+  val shuffleId = m.getArgument(2).asInstanceOf[Int]
+  assert(shuffleId == SHUFFLE_ID)
+  foundHosts.add(host)
+  true
+})
+
+  sc.cleaner.get.doCleanupShuffle(SHUFFLE_ID, blocking = true)
+  assert(foundHosts.asScala == mergerLocs.map(_.host).toSet)
+}
+  }

Review Comment:
   Backport the code, much thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1068895209


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +530,40 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+  AppShuffleInfo appShuffleInfo,
+  int[] reduceIds,
+  boolean deleteFromDB) {
+if(deleteFromDB) {

Review Comment:
   Format code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067998412


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thanks @mridulm , add this UT.
   Yes, `RemoveShuffle` is sent to the storage endpoint in `removeShuffle`.
   Add more debug log in our test cluster, shuffle 2 will be removed by thread 
`block-manager-storage-async-thread-pool-51` in `BlockManagerStorageEndpoint` 
class
   ```
   23/01/12 00:43:40 WARN [block-manager-storage-async-thread-pool-51] 
storage.BlockManagerStorageEndpoint:72 : Call 
mapOutputTracker.unregisterShuffle for 2
   23/01/12 00:43:40 WARN [block-manager-storage-async-thread-pool-51] 
spark.MapOutputTrackerMaster:72 : shuffleStatuses.remove(shuffleId) for shuffle 
2
   23/01/12 00:43:40 WARN [dispatcher-BlockManagerMaster] 
storage.BlockManagerMasterEndpoint:72 : Call getShufflePushMergerLocations for 2
   23/01/12 00:43:40 WARN [dispatcher-BlockManagerMaster] 
spark.MapOutputTrackerMaster:72 : shuffleStatuses.get(shuffleId) = None
   23/01/12 00:43:40 WARN [dispatcher-BlockManagerMaster] 
spark.MapOutputTrackerMaster:72 : 
shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations) = None
   23/01/12 00:43:40 WARN [Spark Context Cleaner] 
spark.MapOutputTrackerMaster:72 : shuffleStatuses.remove(shuffleId) for shuffle 
2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067816705


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Filed a ticket for this https://issues.apache.org/jira/browse/SPARK-42025



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067809378


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   1.  BlockManagerMaster is also registered as a BlockManagerInfo
   ```
   SparkContext: _env.blockManager.initialize(_applicationId)
   BlockManager:
   
   val idFromMaster = master.registerBlockManager(
 id,
 diskBlockManager.localDirsString,
 maxOnHeapMemory,
 maxOffHeapMemory,
 storageEndpoint)
   ```
   
   2. BlockManagerMaster receive RemoveShuffle RPC and remove all the shuffle 
data in MapOutputTrackerMaster
   ```
   BlockManagerStorageEndpoint:
   case RemoveShuffle(shuffleId) =>
 doAsync[Boolean]("removing shuffle " + shuffleId, context) {
   if (mapOutputTracker != null) {
 logWarning(s"Call mapOutputTracker.unregisterShuffle for 
$shuffleId")
 mapOutputTracker.unregisterShuffle(shuffleId)
   }
   ---
   MapOutputTrackerMaster:
 def unregisterShuffle(shuffleId: Int): Unit = {
   shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
   
   ```
   3.  No shuffle status in MapOutputTracker:
   ```
   BlockManagerMasterEndpoint:
   
   val mergerLocations =
 if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   mapOutputTracker.getShufflePushMergerLocations(shuffleId)
 } else {
   Seq.empty[BlockManagerId]
 }
   
   ---
   MapOutputTrackerMaster:
   
 override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
   
logWarning("shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations) 
= " +
 
s"${shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations)}")
   
shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations).getOrElse(Seq.empty)
 }
   ```
   
   4. It seems removeShuffleFromShuffleServicesFutures will also be empty due 
to this reason.
   
   
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L336
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067694549


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   The following is my debug logs:
   ```
   23/01/11 21:34:33 WARN [block-manager-storage-async-thread-pool-120] 
storage.BlockManagerStorageEndpoint:72 : Call 
mapOutputTracker.unregisterShuffle for 12
   23/01/11 21:34:33 WARN [dispatcher-BlockManagerMaster] 
storage.BlockManagerMasterEndpoint:72 : Call getShufflePushMergerLocations for 
12
   23/01/11 21:34:33 WARN [block-manager-storage-async-thread-pool-74] 
storage.BlockManagerStorageEndpoint:72 : Call 
mapOutputTracker.unregisterShuffle for 11
   23/01/11 21:34:33 WARN [dispatcher-BlockManagerMaster] 
storage.BlockManagerMasterEndpoint:72 : Call getShufflePushMergerLocations for 
11
   ```
   
   If we move the code after RemoveShuffle RPC to BlockManagerStorageEndpoint, 
we may get empty merge locations.
   ```
   val removeMsg = RemoveShuffle(shuffleId)
   val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm 
=>
 bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   // use false as default value means no shuffle data were removed
   handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
 }
   }.toSeq
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1066941140


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteCurrentMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+  if(deleteCurrentMergedShuffle) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() ->
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+} else {
+  submitCleanupTask(() ->
+deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+mergePartitionsInfo.getReduceIds(), false));
+}
+  } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {

Review Comment:
   Combine `deleteCurrentMergedShuffle || shuffleMergeId > 
mergePartitionsInfo.shuffleMergeId` to simplify the code. And add some 
comments. cc @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1066912578


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Do we really care about the removing result? If something went wrong, we 
will still remove the leak merged shuffle files?
   And if the shuffle service won't return rpc result(for example, the ESS is 
done), will the RPCs in `outstandingRpcs` leak?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1066901294


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   For `def removeShuffle(shuffleId: Int): Future[Seq[Boolean]]`  in 
`BlockManagerMasterEndpoint`, I prefer to put this code before 
   ```
   val removeMsg = RemoveShuffle(shuffleId)
   val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm 
=>
 bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   // use false as default value means no shuffle data were removed
   handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
 }
   }.toSeq
   ```
   because mapOutputTracker may unregisterShuffle this shuffle in 
`BlockManagerStorageEndpoint` before 
`mapOutputTracker.getShufflePushMergerLocations(shuffleId)`.
   ```
   case RemoveShuffle(shuffleId) =>
 doAsync[Boolean]("removing shuffle " + shuffleId, context) {
   if (mapOutputTracker != null) {
 mapOutputTracker.unregisterShuffle(shuffleId)
   }
   SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059572807


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);
+  });
+}
+  } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+submitCleanupTask(() -> {
+  closeAndDeleteOutdatedPartitions(
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+  writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   For example, for `appAttemptShuffleMergeId_1  = 
AppAttemptShuffleMergeId(app_1, appAttempt_1, shuffle_1, shuffleMergeId_1)`, 
the main thread submits a cleaner task with `appAttemptShuffleMergeId_1`, and 
then `writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId_1)`.
   The cleaner thread will cleanup `appAttemptShuffleMergeId_1` from DB later, 
but we expect `appAttemptShuffleMergeId_1` is in the DB to keep consistent with 
`appShuffleInfo.shuffles`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059407327


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+  if(deleteAllMergedShuffle) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);

Review Comment:
   done



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059407175


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);

Review Comment:
   I'm sorry, `AppShuffleMergePartitionsInfo mergePartitionsInfo` can not cast 
to `AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId` ?



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059406697


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+int shuffleId = appAttemptShuffleMergeId.shuffleId;
+int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+for (int reduceId : reduceIds) {
+  try {
+File dataFile =
+appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
+dataFile.delete();
+  } catch (Exception e) {

Review Comment:
   Add a consolidated log message at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059390693


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);
+  });
+}
+  } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+submitCleanupTask(() -> {
+  closeAndDeleteOutdatedPartitions(
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+  writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   For the current code, if `currentAppAttemptShuffleMergeId` === 
`appAttemptShuffleMergeId`
   * The main thread run 
`writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId)` before.
   * The cleaner task run 
`removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);`  later, and no 
`appAttemptShuffleMergeId` in DB



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-22 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1055375593


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {

Review Comment:
   Change `computeIfPresent ` to `compute`, and handle the case 
`mergePartitionsInfo == null`:
   ```
 if (mergePartitionsInfo == null) {
   if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
 return null;
   } else {
 writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
 msg.appId, msg.appAttemptId, msg.shuffleId, 
msg.shuffleMergeId));
 return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-22 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1055372990


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);
+  });
+}
+  } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+submitCleanupTask(() -> {
+  closeAndDeleteOutdatedPartitions(
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+  writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   `The update to db` should be done after `closeAndDeleteOutdatedPartitions` 
or `deleteMergedFiles` method in `mergedShuffleCleaner` thread, or there will 
be multi-thread concurrency issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-22 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1055371103


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);

Review Comment:
   Pass `appShuffleInfo` to `deleteMergedFiles` method to avoid unnecessary 
method call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-22 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1055370408


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+int shuffleId = appAttemptShuffleMergeId.shuffleId;
+int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+for (int reduceId : reduceIds) {
+  try {
+File dataFile =
+appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
+dataFile.delete();
+  } catch (Exception e) {

Review Comment:
   Could we log warn messages if `File.delete()` return false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-21 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1055160153


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);

Review Comment:
   Thanks for your review.
   But for this method, shuffle merge files only related to appId, shuffleId, 
shuffleMergeId, reduceId. so it also make sense with a different attempt id?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-20 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1053448485


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   Now we will keep all the latest shuffles info in `appShuffleInfo.shuffles` 
and DB until `applicationRemoved` or ShuffleService is restarted. This can also 
be a potential risk for long-running applications like SparkThriftServer.
   We can fix it in the following PR.
   @yabola @mridulm Could you help to check if I'm right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-20 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1053386036


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());

Review Comment:
   ```
   if (!mergePartitionsInfo.isFinalized()) {
 submitCleanupTask(() -> {
   closeAndDeleteOutdatedPartitions(
   currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
   writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
 });
   } else {
 submitCleanupTask(() -> {
   deleteMergedFiles(currentAppAttemptShuffleMergeId,
   mergePartitionsInfo.getReduceIds());
   writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
   mergePartitionsInfo.setReduceIds(new int[0]);
 });
   }
   ```
   
   Yes, if mergePartitionsInfo is not finalized 
`mergePartitionsInfo.isFinalized()`, it will clean the current partitions.
   It depends on the `AppShuffleMergePartitionsInfo` in 
`appShuffleInfo.shuffles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-17 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1051520422


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-17 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1051520374


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -702,7 +722,8 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
 "finalizing shuffle partition {}", msg.appId, 
msg.appAttemptId, msg.shuffleId,
 msg.shuffleMergeId, partition.reduceId);
   } finally {
-partition.closeAllFilesAndDeleteIfNeeded(false);
+Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+partition.closeAllFilesAndDeleteIfNeeded(deleteFile);

Review Comment:
   Won't return the reduceId which `partition.mapTracker.getCardinality() == 
0`, so we can `closeAllFilesAndDeleteIfNeeded` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990772882


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -650,24 +666,28 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
 } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
   // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
   // empty MergeStatuses but cleanup the older shuffleMergeId files.
+  Map shuffleMergePartitions =
+  mergePartitionsInfo.shuffleMergePartitions;
   submitCleanupTask(() ->
-  closeAndDeleteOutdatedPartitions(
-  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+  closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
shuffleMergePartitions));
 } else {

Review Comment:
   Now in `finalizeShuffleMerge()` method we will update the 
`AppShuffleMergePartitionsInfo` to an empty object, so we can not cleanup the 
leaked files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990772196


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);

Review Comment:
   Update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990772179


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java:
##
@@ -224,6 +224,12 @@ protected void handleMessage(
   } finally {
 responseDelayContext.stop();
   }
+} else if (msgObj instanceof RemoveShuffleMerge) {
+  RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
+  checkAuth(client, msg.appId);
+  logger.info("Removing shuffle merge data for application {} shuffle {} 
shuffleMerge {}",
+  msg.appId, msg.shuffleId, msg.shuffleMergeId);
+  mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId, 
msg.shuffleMergeId);

Review Comment:
   Update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-09-18 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r973864857


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1395,16 +1393,24 @@ private[spark] class DAGScheduler(
   }
 
   private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): 
Seq[BlockManagerId] = {
-val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-  stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-if (mergerLocs.nonEmpty) {
-  stage.shuffleDep.setMergerLocs(mergerLocs)
+stage.shuffleDep.synchronized {
+  val oldMergeLocs = stage.shuffleDep.getMergerLocs
+  if (oldMergeLocs.isEmpty) {
+val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+  stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+if (mergerLocs.nonEmpty) {
+  stage.shuffleDep.setMergerLocs(mergerLocs)
+  mapOutputTracker.registerShufflePushMergerLocations(
+stage.shuffleDep.shuffleId, mergerLocs)
+}
+logDebug(s"Shuffle merge locations for shuffle 
${stage.shuffleDep.shuffleId} with" +
+  s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
+  s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+mergerLocs
+  } else {
+oldMergeLocs
+  }
 }
-
-logDebug(s"Shuffle merge locations for shuffle 
${stage.shuffleDep.shuffleId} with" +
-  s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
-  s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-mergerLocs
   }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-09-18 Thread GitBox


wankunde commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r973864765


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;
+  public final int shuffleId;
+
+  public RemoveShuffleMerge(String appId, int shuffleId) {
+this.appId = appId;
+this.shuffleId = shuffleId;
+  }
+
+  @Override
+  protected Type type() {
+return Type.REMOVE_SHUFFLE_MERGE;
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(appId, shuffleId);
+  }
+
+  @Override
+  public String toString() {
+return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+  .append("appId", appId)
+  .append("shuffleId", shuffleId)
+  .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other != null && other instanceof RemoveShuffleMerge) {
+  RemoveShuffleMerge o = (RemoveShuffleMerge) other;
+  return Objects.equal(appId, o.appId)
+&& shuffleId == o.shuffleId;

Review Comment:
   done



##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -364,8 +364,16 @@ class BlockManagerMasterEndpoint(
 }
   }.getOrElse(Seq.empty)
 
+val removeShuffleMergeFromShuffleServicesFutures =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org