Victsm commented on a change in pull request #30062:
URL: https://github.com/apache/spark/pull/30062#discussion_r511141958
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
##########
@@ -363,4 +363,26 @@ public boolean useOldFetchProtocol() {
return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
}
+ /**
+ * The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during
+ * push-based shuffle.
+ * A merged shuffle file consists of multiple small shuffle blocks. Fetching
the
+ * complete merged shuffle file in a single response increases the memory
requirements for the
+ * clients. Instead of serving the entire merged file, the shuffle service
serves the
+ * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in
entirety and this
+ * configuration controls how big a chunk can get. A corresponding index
file for each merged
+ * shuffle file will be generated indicating chunk boundaries.
+ */
+ public int minChunkSizeInMergedShuffleFile() {
+ return Ints.checkedCast(JavaUtils.byteStringAsBytes(
+ conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
Review comment:
If it's not static, we would make it as a parameter passed during
initial executor registration.
This would perhaps introduce a new registration RPC to maintain backward
compatibility with the existing RegisterExecutor RPC, and make this chunk size
a global config for all shuffles of that application.
In terms of difference, we don't see much impact on the performance side if
this size is set to something reasonable, in a few MB range.
When we set it too large, like above 5 MB, it could have slight memory
implications if the other parameters like `maxBytesInFlight`,
`maxReqsInFlight`, and `maxBlocksInFlightPerAddress` are not set accordingly.
If an executor runs multiple reducers concurrently, each fetching chunks
from a merged shuffle partition, then these reducers potentially need to fetch
100s or even 1000+ MB-sized blocks.
This could be a very different block fetching pattern compared with fetching
the original blocks.
An inappropriate setting for parameters `maxBytesInFlight`,
`maxReqsInFlight`, and `maxBlocksInFlightPerAddress` could lead to more data
being fetched concurrently, thus increasing memory consumptions on the client
side.
We mitigated this by reducing the default chunk size to 2mb, and also
adjusted our settings for the default values for the other shuffle related
parameters.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]