Ngone51 commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r521957957
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,50 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.push.enabled")
+ .doc("Set to 'true' to enable push based shuffle on the client side and
this works in " +
+ "conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl " +
+ "which needs to be set with the appropriate " +
+ "org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based " +
+ "shuffle to be enabled")
+ .version("3.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+ ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+ .doc("Maximum number of shuffle push mergers locations cached for push
based shuffle. " +
+ "Currently shuffle push merger locations are nothing but shuffle
services where an " +
+ "executor is launched in the case of Push based shuffle.")
+ .version("3.1.0")
+ .intConf
+ .createWithDefault(500)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+ ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
+ .doc("The minimum number of shuffle merger locations required to enable
push based " +
+ "shuffle for a stage. This is specified as a ratio of the number of
partitions in " +
+ "the child stage. For example, a reduce stage which has 100 partitions
and uses the " +
+ "default value 0.05 requires at least 5 unique merger locations to
enable push based " +
+ "shuffle. Merger locations are currently defined as external shuffle
services.")
+ .version("3.1.0")
+ .doubleConf
+ .createWithDefault(0.05)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
+ ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
+ .doc("The static threshold for number of shuffle push mergers locations
should be " +
+ " available in order to enable push based shuffle for a stage. Note
this config " +
+ "works in conjunction with
spark.shuffle.push.mergersMinThresholdRatio. Maximum " +
+ "of spark.shuffle.push.mergersMinStaticThreshold and " +
+ "spark.shuffle.push.mergersMinThresholdRatio ratio number of mergers
needed to " +
+ "enable push based shuffle for a stage. For eg: with 1000 partitions
for the child " +
+ "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
Review comment:
Please use `${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}` instead
of hard-coded.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,50 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.push.enabled")
+ .doc("Set to 'true' to enable push based shuffle on the client side and
this works in " +
+ "conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl " +
+ "which needs to be set with the appropriate " +
+ "org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based " +
+ "shuffle to be enabled")
+ .version("3.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+ ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+ .doc("Maximum number of shuffle push mergers locations cached for push
based shuffle. " +
+ "Currently shuffle push merger locations are nothing but shuffle
services where an " +
+ "executor is launched in the case of Push based shuffle.")
+ .version("3.1.0")
+ .intConf
+ .createWithDefault(500)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+ ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
+ .doc("The minimum number of shuffle merger locations required to enable
push based " +
+ "shuffle for a stage. This is specified as a ratio of the number of
partitions in " +
+ "the child stage. For example, a reduce stage which has 100 partitions
and uses the " +
+ "default value 0.05 requires at least 5 unique merger locations to
enable push based " +
+ "shuffle. Merger locations are currently defined as external shuffle
services.")
+ .version("3.1.0")
+ .doubleConf
+ .createWithDefault(0.05)
+
+ private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
+ ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
+ .doc("The static threshold for number of shuffle push mergers locations
should be " +
+ " available in order to enable push based shuffle for a stage. Note
this config " +
+ "works in conjunction with
spark.shuffle.push.mergersMinThresholdRatio. Maximum " +
+ "of spark.shuffle.push.mergersMinStaticThreshold and " +
+ "spark.shuffle.push.mergersMinThresholdRatio ratio number of mergers
needed to " +
+ "enable push based shuffle for a stage. For eg: with 1000 partitions
for the child " +
+ "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
+ "spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would
need atleast 50 " +
Review comment:
ditto.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]