Victsm commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r530575752



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1992,4 +1992,32 @@ package object config {
       .version("3.1.0")
       .doubleConf
       .createWithDefault(5)
+
+  private[spark] val SHUFFLE_NUM_PUSH_THREADS =
+    ConfigBuilder("spark.shuffle.push.numPushThreads")
+      .doc("Specify the number of threads in the block pusher pool. These 
threads assist " +
+        "in creating connections and pushing blocks to remote shuffle services 
when push based " +
+        "shuffle is enabled. By default, the threadpool size is equal to the 
number of cores.")
+      .version("3.1.0")
+      .intConf
+      .createOptional
+
+  private[spark] val SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH =
+    ConfigBuilder("spark.shuffle.push.maxBlockSizeToPush")
+      .doc("The max size of an individual block to push to the remote shuffle 
services when push " +
+        "based shuffle is enabled. Blocks larger than this threshold are not 
pushed.")
+      .version("3.1.0")
+      .bytesConf(ByteUnit.KiB)
+      .createWithDefaultString("800k")

Review comment:
       Agree that 1MB as the default value is perhaps more appropriate.
   We are right now achieving 2 things with this configuration:
   
   1. It serves as a threshold to determine whether a block is large enough to 
not trigger a potential disk throughput issue.
   2. It also serves as a static threshold to determine if a block is 
potentially coming from a skewed partition, which would likely be much larger 
than the other blocks generated by the current mapper.
   
   For the 1st need, a static configuration is perhaps sufficient.
   For the 2nd need, not sure if we should also look at the distribution of the 
block sizes for a given mapper, which is known to the ShuffleBlockPusher at the 
beginning of `initiateBlockPush`.
   If the total size of the skewed partition is not way too large (a few GB 
instead of 10s or 100s GB), it's probably still fine to merge it.
   If the skewed partition size is too large, merging it into a single shuffle 
service is probably not ideal with the possibility of overloading that shuffle 
service.
   
   This is also where push-based shuffle and AQE skew join intersects.
   Since the blocks get merged into a merged shuffle partition in random order, 
that merged shuffle partition cannot be further divided for AQE skew join.
   Since push-based shuffle maintains both the merged shuffle files and the 
original shuffle files, we currently bypass skewed partitions in push-based 
shuffle and let AQE handle these partitions.
   This is part of #30480 
   For AQE skewed join, it uses a skew factor to determine which partitions are 
skewed.
   I'm wondering whether we should take similar approaches here to decide which 
blocks should be considered as potentially coming from a skewed partition.
   
   @tgravescs @attilapiros @Ngone51 @jiangxb1987 @mridulm @dongjoon-hyun want 
to know your thoughts on this as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to