This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 233af3d  [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level 
user documentation
233af3d is described below

commit 233af3d2396391185c45b69c4bd2bf9ca8f1af67
Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
AuthorDate: Mon Aug 16 10:24:40 2021 -0500

    [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
    
    ### What changes were proposed in this pull request?
    
    Document the push-based shuffle feature with a high level overview of the 
feature and corresponding configuration options for both shuffle server side as 
well as client side. This is how the changes to the doc looks on the browser 
([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))
    
    ### Why are the changes needed?
    
    Helps users understand the feature
    
    ### Does this PR introduce _any_ user-facing change?
    
    Docs
    
    ### How was this patch tested?
    
    N/A
    
    Closes #33615 from venkata91/SPARK-36374.
    
    Authored-by: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 2270ecf32f7ae478570145219d2ce71a642076cf)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../apache/spark/network/util/TransportConf.java   |  28 ++++--
 .../shuffle/RemoteBlockPushResolverSuite.java      |   2 +-
 .../org/apache/spark/internal/config/package.scala |  63 ++++++------
 docs/configuration.md                              | 106 +++++++++++++++++++++
 4 files changed, 157 insertions(+), 42 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 69b8b25..ed0ca918 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -390,24 +390,32 @@ public class TransportConf {
   /**
    * The minimum size of a chunk when dividing a merged shuffle file into 
multiple chunks during
    * push-based shuffle.
-   * A merged shuffle file consists of multiple small shuffle blocks. Fetching 
the
-   * complete merged shuffle file in a single response increases the memory 
requirements for the
-   * clients. Instead of serving the entire merged file, the shuffle service 
serves the
-   * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in 
entirety and this
-   * configuration controls how big a chunk can get. A corresponding index 
file for each merged
-   * shuffle file will be generated indicating chunk boundaries.
+   * A merged shuffle file consists of multiple small shuffle blocks. Fetching 
the complete
+   * merged shuffle file in a single disk I/O increases the memory 
requirements for both the
+   * clients and the external shuffle service. Instead, the external shuffle 
service serves
+   * the merged file in MB-sized chunks. This configuration controls how big a 
chunk can get.
+   * A corresponding index file for each merged shuffle file will be generated 
indicating chunk
+   * boundaries.
+   *
+   * Setting this too high would increase the memory requirements on both the 
clients and the
+   * external shuffle service.
+   *
+   * Setting this too low would increase the overall number of RPC requests to 
external shuffle
+   * service unnecessarily.
    */
   public int minChunkSizeInMergedShuffleFile() {
     return Ints.checkedCast(JavaUtils.byteStringAsBytes(
-      conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
+      conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", 
"2m")));
   }
 
   /**
-   * The size of cache in memory which is used in push-based shuffle for 
storing merged index files.
+   * The maximum size of cache in memory which is used in push-based shuffle 
for storing merged
+   * index files. This cache is in addition to the one configured via
+   * spark.shuffle.service.index.cache.size.
    */
   public long mergedIndexCacheSize() {
     return JavaUtils.byteStringAsBytes(
-      conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
+      conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m"));
   }
 
   /**
@@ -417,7 +425,7 @@ public class TransportConf {
    * blocks for this shuffle partition.
    */
   public int ioExceptionsThresholdDuringMerge() {
-    return 
conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
+    return 
conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
   }
 
   /**
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 46d6366..d7881f0 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -87,7 +87,7 @@ public class RemoteBlockPushResolverSuite {
   public void before() throws IOException {
     localDirs = createLocalDirs(2);
     MapConfigProvider provider = new MapConfigProvider(
-      ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", 
"4"));
+      
ImmutableMap.of("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", 
"4"));
     conf = new TransportConf("shuffle", provider);
     pushResolver = new RemoteBlockPushResolver(conf);
     registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), 
MERGE_DIRECTORY_META);
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 17c585d..7ed1f1d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2097,31 +2097,33 @@ package object config {
 
   private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
     ConfigBuilder("spark.shuffle.push.enabled")
-      .doc("Set to 'true' to enable push-based shuffle on the client side and 
this works in " +
+      .doc("Set to true to enable push-based shuffle on the client side and 
this works in " +
         "conjunction with the server side flag 
spark.shuffle.server.mergedShuffleFileManagerImpl " +
         "which needs to be set with the appropriate " +
         "org.apache.spark.network.shuffle.MergedShuffleFileManager 
implementation for push-based " +
         "shuffle to be enabled")
-      .version("3.1.0")
+      .version("3.2.0")
       .booleanConf
       .createWithDefault(false)
 
   private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
-    ConfigBuilder("spark.shuffle.push.merge.results.timeout")
-      .doc("Specify the max amount of time DAGScheduler waits for the merge 
results from " +
-        "all remote shuffle services for a given shuffle. DAGScheduler will 
start to submit " +
-        "following stages if not all results are received within the timeout.")
+    ConfigBuilder("spark.shuffle.push.results.timeout")
+      .internal()
+      .doc("The maximum amount of time driver waits in seconds for the merge 
results to be" +
+        " received from all remote external shuffle services for a given 
shuffle. Driver" +
+        " submits following stages if not all results are received within the 
timeout. Setting" +
+        " this too long could potentially lead to performance regression")
       .version("3.2.0")
       .timeConf(TimeUnit.SECONDS)
       .checkValue(_ >= 0L, "Timeout must be >= 0.")
       .createWithDefaultString("10s")
 
   private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
-    ConfigBuilder("spark.shuffle.push.merge.finalize.timeout")
-      .doc("Specify the amount of time DAGScheduler waits after all mappers 
finish for " +
-        "a given shuffle map stage before it starts sending merge finalize 
requests to " +
-        "remote shuffle services. This allows the shuffle services some extra 
time to " +
-        "merge as many blocks as possible.")
+    ConfigBuilder("spark.shuffle.push.finalize.timeout")
+      .doc("The amount of time driver waits, after all mappers have finished 
for a given" +
+        " shuffle map stage, before it sends merge finalize requests to remote 
external shuffle" +
+        " services. This gives the external shuffle services extra time to 
merge blocks. Setting" +
+        " this too long could potentially lead to performance regression")
       .version("3.2.0")
       .timeConf(TimeUnit.SECONDS)
       .checkValue(_ >= 0L, "Timeout must be >= 0.")
@@ -2129,54 +2131,53 @@ package object config {
 
   private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
     ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
-      .doc("Maximum number of shuffle push merger locations cached for push 
based shuffle. " +
-        "Currently, shuffle push merger locations are nothing but external 
shuffle services " +
-        "which are responsible for handling pushed blocks and merging them and 
serving " +
-        "merged blocks for later shuffle fetch.")
-      .version("3.1.0")
+      .doc("Maximum number of merger locations cached for push-based shuffle. 
Currently, merger" +
+        " locations are hosts of external shuffle services responsible for 
handling pushed" +
+        " blocks, merging them and serving merged blocks for later shuffle 
fetch.")
+      .version("3.2.0")
       .intConf
       .createWithDefault(500)
 
   private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
     ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
-      .doc("The minimum number of shuffle merger locations required to enable 
push based " +
-        "shuffle for a stage. This is specified as a ratio of the number of 
partitions in " +
-        "the child stage. For example, a reduce stage which has 100 partitions 
and uses the " +
-        "default value 0.05 requires at least 5 unique merger locations to 
enable push based " +
-        "shuffle. Merger locations are currently defined as external shuffle 
services.")
-      .version("3.1.0")
+      .doc("Ratio used to compute the minimum number of shuffle merger 
locations required for" +
+        " a stage based on the number of partitions for the reducer stage. For 
example, a reduce" +
+        " stage which has 100 partitions and uses the default value 0.05 
requires at least 5" +
+        " unique merger locations to enable push-based shuffle. Merger 
locations are currently" +
+        " defined as external shuffle services.")
+      .version("3.2.0")
       .doubleConf
       .createWithDefault(0.05)
 
   private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
     ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
       .doc(s"The static threshold for number of shuffle push merger locations 
should be " +
-        "available in order to enable push based shuffle for a stage. Note 
this config " +
+        "available in order to enable push-based shuffle for a stage. Note 
this config " +
         s"works in conjunction with 
${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
         "Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
         s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of 
mergers needed to " +
-        "enable push based shuffle for a stage. For eg: with 1000 partitions 
for the child " +
+        "enable push-based shuffle for a stage. For eg: with 1000 partitions 
for the child " +
         "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
         s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we 
would need " +
-        "at least 50 mergers to enable push based shuffle for that stage.")
-      .version("3.1.0")
+        "at least 50 mergers to enable push-based shuffle for that stage.")
+      .version("3.2.0")
       .intConf
       .createWithDefault(5)
 
   private[spark] val SHUFFLE_NUM_PUSH_THREADS =
     ConfigBuilder("spark.shuffle.push.numPushThreads")
       .doc("Specify the number of threads in the block pusher pool. These 
threads assist " +
-        "in creating connections and pushing blocks to remote shuffle 
services. By default, the " +
-        "threadpool size is equal to the number of spark executor cores.")
+        "in creating connections and pushing blocks to remote external shuffle 
services. By" +
+        " default, the threadpool size is equal to the number of spark 
executor cores.")
       .version("3.2.0")
       .intConf
       .createOptional
 
   private[spark] val SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH =
     ConfigBuilder("spark.shuffle.push.maxBlockSizeToPush")
-      .doc("The max size of an individual block to push to the remote shuffle 
services. Blocks " +
-       "larger than this threshold are not pushed to be merged remotely. These 
shuffle blocks " +
-       "will be fetched by the executors in the original manner.")
+      .doc("The max size of an individual block to push to the remote external 
shuffle services." +
+        " Blocks larger than this threshold are not pushed to be merged 
remotely. These shuffle" +
+        " blocks will be fetched by the executors in the original manner.")
       .version("3.2.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1m")
diff --git a/docs/configuration.md b/docs/configuration.md
index a4fdc4c..770c8b4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3152,3 +3152,109 @@ The stage level scheduling feature allows users to 
specify task and executor res
 This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN and Kubernetes when dynamic allocation is enabled. See the 
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page 
for more implementation details.
 
 See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this 
feature. The current implementation acquires new executors for each 
`ResourceProfile`  created and currently has to be an exact match. Spark does 
not try to fit tasks into an executor that require a different ResourceProfile 
than the executor was created with. Executors that are not in use will idle 
timeout with the dynamic allocation logic. The default configuration for this 
feature is to only allow one Resour [...]
+
+# Push-based shuffle overview
+
+Push-based shuffle helps improve the reliability and performance of spark 
shuffle. It takes a best-effort approach to push the shuffle blocks generated 
by the map tasks to remote external shuffle services to be merged per shuffle 
partition. Reduce tasks fetch a combination of merged shuffle partitions and 
original shuffle blocks as their input data, resulting in converting small 
random disk reads by external shuffle services into large sequential reads. 
Possibility of better data localit [...]
+
+<p> Push-based shuffle improves performance for long running jobs/queries 
which involves large disk I/O during shuffle. Currently it is not well suited 
for jobs/queries which runs quickly dealing with lesser amount of shuffle data. 
This will be further improved in the future releases.</p>
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN with 
external shuffle service. </b></p>
+
+### External Shuffle service(server) side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since 
Version</th></tr>
+<tr>
+  <td><code>spark.shuffle.push.server.mergedShuffleFileManagerImpl</code></td>
+  <td>
+    <code>org.apache.spark.network.shuffle.<br 
/>NoOpMergedShuffleFileManager</code>
+  </td>
+  <td>
+    Class name of the implementation of <code>MergedShuffleFileManager</code> 
that manages push-based shuffle. This acts as a server side config to disable 
or enable push-based shuffle. By default, push-based shuffle is disabled at the 
server side. <p> To enable push-based shuffle on the server side, set this 
config to 
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code></p>
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  
<td><code>spark.shuffle.push.server.minChunkSizeInMergedShuffleFile</code></td>
+  <td><code>2m</code></td>
+  <td>
+    <p> The minimum size of a chunk when dividing a merged shuffle file into 
multiple chunks during push-based shuffle. A merged shuffle file consists of 
multiple small shuffle blocks. Fetching the complete merged shuffle file in a 
single disk I/O increases the memory requirements for both the clients and the 
external shuffle services. Instead, the external shuffle service serves the 
merged file in <code>MB-sized chunks</code>.<br /> This configuration controls 
how big a chunk can get. A [...]
+    <p> Setting this too high would increase the memory requirements on both 
the clients and the external shuffle service. </p>
+    <p> Setting this too low would increase the overall number of RPC requests 
to external shuffle service unnecessarily.</p>
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.server.mergedIndexCacheSize</code></td>
+  <td><code>100m</code></td>
+  <td>
+    The maximum size of cache in memory which could be used in push-based 
shuffle for storing merged index files. This cache is in addition to the one 
configured via <code>spark.shuffle.service.index.cache.size</code>.
+  </td>
+  <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since 
Version</th></tr>
+<tr>
+  <td><code>spark.shuffle.push.enabled</code></td>
+  <td><code>false</code></td>
+  <td>
+    Set to true to enable push-based shuffle on the client side and works in 
conjunction with the server side flag 
<code>spark.shuffle.server.mergedShuffleFileManagerImpl</code>.
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.finalize.timeout</code></td>
+  <td><code>10s</code></td>
+  <td>
+    The amount of time driver waits in seconds, after all mappers have 
finished for a given shuffle map stage, before it sends merge finalize requests 
to remote external shuffle services. This gives the external shuffle services 
extra time to merge blocks. Setting this too long could potentially lead to 
performance regression.
+  </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+  <td><code>500</code></td>
+  <td>
+    Maximum number of merger locations cached for push-based shuffle. 
Currently, merger locations are hosts of external shuffle services responsible 
for handling pushed blocks, merging them and serving merged blocks for later 
shuffle fetch.
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+  <td><code>0.05</code></td>
+  <td>
+    Ratio used to compute the minimum number of shuffle merger locations 
required for a stage based on the number of partitions for the reducer stage. 
For example, a reduce stage which has 100 partitions and uses the default value 
0.05 requires at least 5 unique merger locations to enable push-based shuffle.
+  </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.mergersMinStaticThreshold</code></td>
+  <td><code>5</code></td>
+  <td>
+    The static threshold for number of shuffle push merger locations should be 
available in order to enable push-based shuffle for a stage. Note this config 
works in conjunction with 
<code>spark.shuffle.push.mergersMinThresholdRatio</code>. Maximum of 
<code>spark.shuffle.push.mergersMinStaticThreshold</code> and 
<code>spark.shuffle.push.mergersMinThresholdRatio</code> ratio number of 
mergers needed to enable push-based shuffle for a stage. For example: with 1000 
partitions for the child  [...]
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.maxBlockSizeToPush</code></td>
+  <td><code>1m</code></td>
+  <td>
+    <p> The max size of an individual block to push to the remote external 
shuffle services. Blocks larger than this threshold are not pushed to be merged 
remotely. These shuffle blocks will be fetched in the original manner. </p>
+    <p> Setting this too high would result in more blocks to be pushed to 
remote external shuffle services but those are already efficiently fetched with 
the existing mechanisms resulting in additional overhead of pushing the large 
blocks to remote external shuffle services. It is recommended to set 
<code>spark.shuffle.push.maxBlockSizeToPush</code> lesser than 
<code>spark.shuffle.push.maxBlockBatchSize</code> config's value. </p>
+    <p> Setting this too low would result in lesser number of blocks getting 
merged and directly fetched from mapper external shuffle service results in 
higher small random reads affecting overall disk I/O performance. </p>
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.maxBlockBatchSize</code></td>
+  <td><code>3m</code></td>
+  <td>
+    The max size of a batch of shuffle blocks to be grouped into a single push 
request. Default is set to <code>3m</code> in order to keep it slightly higher 
than <code>spark.storage.memoryMapThreshold</code> default which is 
<code>2m</code> as it is very likely that each batch of block gets memory 
mapped which incurs higher overhead.
+  </td>
+  <td>3.2.0</td>
+</tr>
+</table>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to