Ngone51 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r745303626



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,30 @@ package object config {
       // with small MB sized chunk of data.
       .createWithDefaultString("3m")
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+    ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+      .doc("Number of threads used by driver to finalize shuffle merge. Since 
it could" +
+        " potentially take seconds for a large shuffle to finalize, having 
multiple threads helps" +
+        " driver to handle concurrent shuffle merge finalize requests when 
push-based" +
+        " shuffle is enabled.")
+      .intConf
+      .createWithDefault(3)
+
+  private[spark] val PUSH_BASED_SHUFFLE_SIZE_MIN_SHUFFLE_SIZE_TO_WAIT =
+    ConfigBuilder("spark.shuffle.push.minShuffleSizeToWait")
+      .doc("Driver will wait for merge finalization to complete only if total 
shuffle size is" +
+        " more than this threshold. If total shuffle size is less, driver will 
immediately" +
+        " finalize the shuffle output")
+      .bytesConf(ByteUnit.BYTE)

Review comment:
       add `version(3.3.0)`

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -168,6 +168,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           }
         }
 
+      case ShufflePushCompletion(shuffleId, mapIndex) =>
+        scheduler.shufflePushCompleted(shuffleId, mapIndex)

Review comment:
       Shall we call `scheduler.dagScheduler.shufflePushCompleted(shuffleId, 
mapIndex)` directly? So we don't need `shufflePushCompleted()` in 
`TaskSchedulerImpl`.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,30 @@ package object config {
       // with small MB sized chunk of data.
       .createWithDefaultString("3m")
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+    ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+      .doc("Number of threads used by driver to finalize shuffle merge. Since 
it could" +
+        " potentially take seconds for a large shuffle to finalize, having 
multiple threads helps" +
+        " driver to handle concurrent shuffle merge finalize requests when 
push-based" +
+        " shuffle is enabled.")
+      .intConf
+      .createWithDefault(3)
+
+  private[spark] val PUSH_BASED_SHUFFLE_SIZE_MIN_SHUFFLE_SIZE_TO_WAIT =
+    ConfigBuilder("spark.shuffle.push.minShuffleSizeToWait")
+      .doc("Driver will wait for merge finalization to complete only if total 
shuffle size is" +
+        " more than this threshold. If total shuffle size is less, driver will 
immediately" +
+        " finalize the shuffle output")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("500m")
+
+  private[spark] val PUSH_BASED_SHUFFLE_MIN_PUSH_RATIO =
+    ConfigBuilder("spark.shuffle.push.minPushRatio")
+      .doc("Fraction of map partitions that should be push complete before 
driver starts" +
+        " shuffle merge finalization during push based shuffle")
+      .doubleConf

Review comment:
       version

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2193,6 +2193,30 @@ package object config {
       // with small MB sized chunk of data.
       .createWithDefaultString("3m")
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_THREADS =
+    ConfigBuilder("spark.shuffle.push.merge.finalizeThreads")
+      .doc("Number of threads used by driver to finalize shuffle merge. Since 
it could" +
+        " potentially take seconds for a large shuffle to finalize, having 
multiple threads helps" +
+        " driver to handle concurrent shuffle merge finalize requests when 
push-based" +
+        " shuffle is enabled.")
+      .intConf

Review comment:
       add `version(3.3.0)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to