dongjoon-hyun commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r829628027



##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output 
tracker
+        // to find blocks served by the shuffle service on deallocated 
executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)

Review comment:
       Is this new pattern safe in all cases? Any change of `FileNotFound`?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -654,6 +654,16 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.service.removeShuffle")
+      .doc("Whether to use the ExternalShuffleService for deleting shuffle 
blocks for " +
+        "deallocated executors when the shuffle is no longer needed. Without 
this enabled, " +
+        "shuffle data on executors that are deallocated will remain on disk 
until the " +
+        "application ends.")
+      .version("3.3.0")

Review comment:
       We need to discuss about this, @mridulm .
   If this feature is not urgent, I'd like to recommend to have this `master` 
only.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -654,6 +654,16 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.service.removeShuffle")
+      .doc("Whether to use the ExternalShuffleService for deleting shuffle 
blocks for " +
+        "deallocated executors when the shuffle is no longer needed. Without 
this enabled, " +
+        "shuffle data on executors that are deallocated will remain on disk 
until the " +
+        "application ends.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       If we want to bring this to Apache Spark 3.3, we had better change the 
`default` value to `false` at least.

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {

Review comment:
       In the real data, there is a case where a shuffle index without its data 
file. So, did we consider those cases in the upper layers where this method is 
invoked?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -141,6 +141,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     assert(attemptId.equals("1"))
   }
 
+  test("SPARK-37618: Sub dirs are group writable") {
+    val conf = testConf.clone
+    conf.set("spark.local.dir", rootDirs)
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.shuffle.service.removeShufle", "true")

Review comment:
       We need to a negative test case whose config is 
`spark.shuffle.service.removeShufle`.
   
   According to the AS-IS PR, we don't have a test coverage because the default 
was `true`.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -299,15 +299,15 @@ public void onSuccess(ByteBuffer response) {
           BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
           numRemovedBlocksFuture.complete(((BlocksRemoved) 
msgObj).numRemovedBlocks);
         } catch (Throwable t) {
-          logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
+          logger.warn("Error trying to remove blocks " + 
Arrays.toString(blockIds) +

Review comment:
       Why do we change like this? Shuffle files belongs to RDD still, doesn't 
it?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -299,15 +299,15 @@ public void onSuccess(ByteBuffer response) {
           BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
           numRemovedBlocksFuture.complete(((BlocksRemoved) 
msgObj).numRemovedBlocks);
         } catch (Throwable t) {
-          logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
+          logger.warn("Error trying to remove blocks " + 
Arrays.toString(blockIds) +

Review comment:
       Why do we change like this? Shuffle files belong to RDD like 
`ShuffledRowRDD` still, don't they?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {

Review comment:
       Historically, WorkerDecommission fails with FileNotFound exception 
because it didn't know about the reality. So, I want to make it sure that the 
upper layer of this method is safe or not.

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {

Review comment:
       Do we have a test coverage for that case about the part what you 
mentioned ?
   > In that case the external shuffle service will just log unable to delete 
the file and continue. Is there anywhere to actually know that information?

##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output 
tracker
+        // to find blocks served by the shuffle service on deallocated 
executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)

Review comment:
       @Kimahriman .
   - Logically, this is counter intuitive, isn't it? New code is weaker than 
before.
   - I guess you are saying that your experience on your environment and it's 
usually not Apache Spark master or 3.3. Which forked version are you using now 
with this patch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to