tgravescs commented on a change in pull request #25907: [SPARK-29206][SHUFFLE]
Make number of shuffle server threads a multiple of number of chunk fetch
handler threads.
URL: https://github.com/apache/spark/pull/25907#discussion_r328663291
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
##########
@@ -111,8 +111,30 @@ public int numConnectionsPerPeer() {
/** Requested maximum length of the queue of incoming connections. Default
is 64. */
public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, 64);
}
- /** Number of threads used in the server thread pool. Default to 0, which is
2x#cores. */
- public int serverThreads() { return
conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
+ /**
+ * The configured ratio between number of server threads and number of chunk
fetch handler
+ * threads. Default to 1, which sets the size of both thread pools to be
equal. Number of
+ * server threads needs to be a multiple of this ratio. See SPARK-29206.
+ */
+ private int getChunkFetchHandlerThreadsRatio() {
+ return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsRatio",
1);
+ }
+
+ /**
+ * Number of threads used in the server thread pool. Default to 0, which is
2x#cores.
+ * If spark.shuffle.server.chunkFetchHandlerThreadsRatio is configured, and
the Netty server
+ * is for shuffle, then the actual # of server threads will round up to the
nearest int that
+ * is a multiple of the configured ratio.
+ */
+ public int serverThreads() {
+ int configuredServerThreads =
conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0);
+ if (this.getModuleName().equalsIgnoreCase("shuffle")) {
+ int chunkFetchHandlerThreadsRatio = getChunkFetchHandlerThreadsRatio();
+ return (int) Math.ceil(configuredServerThreads /
(chunkFetchHandlerThreadsRatio * 1.0));
Review comment:
I understand and agree if you don't set anything then the default is the
same, but In the existing code before this change, if I don't set server
threads and I set spark.shuffle.server.chunkFetchHandlerThreadsPercent=80, then
the code below will apply it: (assume available processors =16)
(logic below in line 337-341):
int chunkFetchHandlerThreadsPercent = 80
int threads = 2 * NettyRuntime.availableProcessors(); = 32
return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
= 26
thus chunk fetcher threads is 26 and server threads is 32. Your patch here
doesn't keep similar behavior. The only way for the config
spark.shuffle.server.chunkFetchHandlerThreadsRatio to apply is by also setting
the server threads explicitly.
Again you also need to fix your rounding code above, you are missing the
multiply.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]