otterc commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r539661632
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -49,7 +50,7 @@ import org.apache.spark.util._
*
* All public methods of this class are thread-safe.
*/
-private class ShuffleStatus(numPartitions: Int) extends Logging {
+private class ShuffleStatus(numPartitions: Int, numReducers: Int) extends
Logging {
Review comment:
These changes should not be part of this PR right? You can just include
the method here you need for `DAGScheduler`.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1988,6 +1988,23 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+ ConfigBuilder("spark.shuffle.push.based.merge.results.timeout")
Review comment:
Nit: remove the "based" just like previous configs.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1988,6 +1988,23 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+ ConfigBuilder("spark.shuffle.push.based.merge.results.timeout")
+ .doc("Specify the max amount of time DAGScheduler waits for the merge
results from " +
+ "all remote shuffle services for a given shuffle. DAGScheduler will
start to submit " +
+ "following stages if not all results are received within the timeout.")
+ .stringConf
+ .createWithDefault("10s")
+
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
+ ConfigBuilder("spark.shuffle.push.based.merge.finalize.timeout")
Review comment:
Also add version.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1988,6 +1988,23 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+ ConfigBuilder("spark.shuffle.push.based.merge.results.timeout")
Review comment:
Also add version
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1988,6 +1988,23 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+ ConfigBuilder("spark.shuffle.push.based.merge.results.timeout")
+ .doc("Specify the max amount of time DAGScheduler waits for the merge
results from " +
+ "all remote shuffle services for a given shuffle. DAGScheduler will
start to submit " +
+ "following stages if not all results are received within the timeout.")
+ .stringConf
+ .createWithDefault("10s")
+
+ private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
+ ConfigBuilder("spark.shuffle.push.based.merge.finalize.timeout")
Review comment:
Nit: same here. remove the "based" just like previous configs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]