Victsm commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r530575752
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1992,4 +1992,32 @@ package object config {
.version("3.1.0")
.doubleConf
.createWithDefault(5)
+
+ private[spark] val SHUFFLE_NUM_PUSH_THREADS =
+ ConfigBuilder("spark.shuffle.push.numPushThreads")
+ .doc("Specify the number of threads in the block pusher pool. These
threads assist " +
+ "in creating connections and pushing blocks to remote shuffle services
when push based " +
+ "shuffle is enabled. By default, the threadpool size is equal to the
number of cores.")
+ .version("3.1.0")
+ .intConf
+ .createOptional
+
+ private[spark] val SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH =
+ ConfigBuilder("spark.shuffle.push.maxBlockSizeToPush")
+ .doc("The max size of an individual block to push to the remote shuffle
services when push " +
+ "based shuffle is enabled. Blocks larger than this threshold are not
pushed.")
+ .version("3.1.0")
+ .bytesConf(ByteUnit.KiB)
+ .createWithDefaultString("800k")
Review comment:
Agree that 1MB as the default value is perhaps more appropriate.
We are right now achieving 2 things with this configuration:
1. It serves as a threshold to determine whether a block is large enough to
not trigger a potential disk throughput issue.
2. It also serves as a static threshold to determine if a block is
potentially coming from a skewed partition, which would likely be much larger
than the other blocks generated by the current mapper.
For the 1st need, a static configuration is perhaps sufficient.
For the 2nd need, not sure if we should also look at the distribution of the
block sizes for a given mapper, which is known to the ShuffleBlockPusher at the
beginning of `initiateBlockPush`.
If the total size of the skewed partition is not way too large (a few GB
instead of 10s or 100s GB), it's probably still fine to merge it.
If the skewed partition size is too large, merging it into a single shuffle
service is probably not ideal with the possibility of overloading that shuffle
service.
This is also where push-based shuffle and AQE skew join intersects.
Since the blocks get merged into a merged shuffle partition in random order,
that merged shuffle partition cannot be further divided for AQE skew join.
Since push-based shuffle maintains both the merged shuffle files and the
original shuffle files, we currently bypass skewed partitions in push-based
shuffle and let AQE handle these partitions.
This is part of #30480
For AQE skewed join, it uses a skew factor to determine which partitions are
skewed.
I'm wondering whether we should take similar approaches here to decide which
blocks should be considered as potentially coming from a skewed partition.
@tgravescs @attilapiros @Ngone51 @jiangxb1987 @mridulm @dongjoon-hyun want
to know your thoughts on this as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]