tgravescs commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r520812097
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.push.enabled")
+ .doc("Set to 'true' to enable push based shuffle on the client side and
this works in" +
+ "conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl" +
+ "which needs to be set with the appropriate" +
+ "org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based" +
+ "shuffle to be enabled")
+ .version("3.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+ ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+ .doc("Maximum number of shuffle push mergers locations cached for push
based shuffle." +
+ "Currently Shuffle push merger locations are nothing but shuffle
services where an" +
Review comment:
nit Shuffle -> shuffle
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.push.enabled")
+ .doc("Set to 'true' to enable push based shuffle on the client side and
this works in" +
+ "conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl" +
+ "which needs to be set with the appropriate" +
+ "org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based" +
+ "shuffle to be enabled")
+ .version("3.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+ ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+ .doc("Maximum number of shuffle push mergers locations cached for push
based shuffle." +
+ "Currently Shuffle push merger locations are nothing but shuffle
services where an" +
+ "executor is launched in the case of Push based shuffle.")
+ .version("3.1.0")
+ .intConf
+ .createWithDefault(500)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+ ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
+ .doc("Minimum percentage of shuffle push mergers locations required to
enable push based" +
+ "shuffle for the stage with respect to number of partitions of the
child stage. This is" +
+ " the number of unique Node Manager locations needed to enable push
based shuffle.")
Review comment:
Perhaps we can reword a bit: The minimum number of shuffle merger
locations required to enable pushed based shuffle for a stage. This is
specified as a ratio of the number of partitions in the child stage. For
example, a reduce stage which has 100 partitions and uses the default value
0.05 requires at least 5 unique merger locations to enable push based shuffle.
Merger locations are currently defined as external shuffle services.
you don't have to use exact text here, just thought something like this
might be a bit more clear
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
execCores.map(cores =>
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}
+ /**
+ * If push based shuffle is enabled, set the shuffle services to be used for
the given
+ * shuffle map stage for block push/merge.
+ *
+ * Even with dynamic resource allocation kicking in and significantly
reducing the number
+ * of available active executors, we would still be able to get sufficient
shuffle service
+ * locations for block push/merge by getting the historical locations of
past executors.
+ */
+ private def prepareShuffleServicesForShuffleMapStage(stage:
ShuffleMapStage): Unit = {
+ // TODO: Handle stage reuse/retry cases separately as without finalize
changes we cannot
Review comment:
can we add jira number here assuming coming later
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -526,6 +548,8 @@ class BlockManagerMasterEndpoint(
blockManagerInfo(id) = new BlockManagerInfo(id,
System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint,
externalShuffleServiceBlockStatus)
+
+ addMergerLocation(id)
Review comment:
seems like we can skip this if push based is disabled or external
shuffle disabled.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.push.enabled")
+ .doc("Set to 'true' to enable push based shuffle on the client side and
this works in" +
Review comment:
nit need to add spaces at the end of each line before ", this goes for
configs below as well
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.push.enabled")
+ .doc("Set to 'true' to enable push based shuffle on the client side and
this works in" +
+ "conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl" +
+ "which needs to be set with the appropriate" +
+ "org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based" +
+ "shuffle to be enabled")
+ .version("3.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+ ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+ .doc("Maximum number of shuffle push mergers locations cached for push
based shuffle." +
+ "Currently Shuffle push merger locations are nothing but shuffle
services where an" +
+ "executor is launched in the case of Push based shuffle.")
+ .version("3.1.0")
+ .intConf
+ .createWithDefault(500)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+ ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
+ .doc("Minimum percentage of shuffle push mergers locations required to
enable push based" +
+ "shuffle for the stage with respect to number of partitions of the
child stage. This is" +
+ " the number of unique Node Manager locations needed to enable push
based shuffle.")
+ .version("3.1.0")
+ .doubleConf
+ .createWithDefault(0.05)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
+ ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
+ .doc("Minimum static number of of shuffle push mergers locations should
be available in" +
+ " order to enable push based shuffle for a stage. Note this config
works in" +
+ " conjunction with spark.shuffle.push.mergersMinThresholdRatio")
Review comment:
we should specify how this works in conjunction -> max of those 2
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]