mridulm commented on a change in pull request #33615:
URL: https://github.com/apache/spark/pull/33615#discussion_r682441410
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+ <td><code>0.05</code></td>
+ <td>
+ The minimum number of shuffle merger locations required to enable push
based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage. For example, a reduce stage which has 100
partitions and uses the default value 0.05 requires at least 5 unique merger
locations to enable push based shuffle. Merger locations are currently defined
as external shuffle services.
Review comment:
`The minimum number of shuffle merger locations required to enable push
based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage.` ->
`Ratio used to compute the minimum number of shuffle merger locations
required for a stage based on the number of partitions for the reducer stage`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.
Review comment:
`Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.` ->
`Maximum number of merger locations cached for push based shuffle.
Currently, merger locations are hosts of external shuffle services responsible
for handling pushed blocks, merging them and serving merged blocks for later
shuffle fetch.`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
Review comment:
`Specify the amount of time DAGScheduler waits after all mappers finish
for a given shuffle map stage before it starts sending merge finalize requests
to remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.` ->
`The amount of time driver waits, after all mappers have finished for a
given shuffle map stage, before it sends merge finalize requests to remote
shuffle services. This allows the shuffle services extra time to merge blocks.`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+ <td><code>0.05</code></td>
+ <td>
+ The minimum number of shuffle merger locations required to enable push
based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage. For example, a reduce stage which has 100
partitions and uses the default value 0.05 requires at least 5 unique merger
locations to enable push based shuffle. Merger locations are currently defined
as external shuffle services.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinStaticThreshold</code></td>
+ <td><code>5</code></td>
+ <td>
+ The static threshold for number of shuffle push merger locations should be
available in order to enable push based shuffle for a stage. Note this config
works in conjunction with
<code>spark.shuffle.push.mergersMinThresholdRatio</code>. Maximum of
<code>spark.shuffle.push.mergersMinStaticThreshold</code> and
<code>spark.shuffle.push.mergersMinThresholdRatio</code> ratio number of
mergers needed to enable push based shuffle for a stage. For eg: with 1000
partitions for the child stage with
spark.shuffle.push.mergersMinStaticThreshold as 5 and
spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need at least
50 mergers to enable push based shuffle for that stage.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.numPushThreads</code></td>
+ <td><code>none</code></td>
+ <td>
+ Specify the number of threads in the block pusher pool. These threads
assist in creating connections and pushing blocks to remote shuffle services.
By default, the threadpool size is equal to the number of spark executor cores.
Review comment:
Do we want to expose this config ?
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+ <td><code>0.05</code></td>
+ <td>
+ The minimum number of shuffle merger locations required to enable push
based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage. For example, a reduce stage which has 100
partitions and uses the default value 0.05 requires at least 5 unique merger
locations to enable push based shuffle. Merger locations are currently defined
as external shuffle services.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinStaticThreshold</code></td>
+ <td><code>5</code></td>
+ <td>
+ The static threshold for number of shuffle push merger locations should be
available in order to enable push based shuffle for a stage. Note this config
works in conjunction with
<code>spark.shuffle.push.mergersMinThresholdRatio</code>. Maximum of
<code>spark.shuffle.push.mergersMinStaticThreshold</code> and
<code>spark.shuffle.push.mergersMinThresholdRatio</code> ratio number of
mergers needed to enable push based shuffle for a stage. For eg: with 1000
partitions for the child stage with
spark.shuffle.push.mergersMinStaticThreshold as 5 and
spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need at least
50 mergers to enable push based shuffle for that stage.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.numPushThreads</code></td>
+ <td><code>none</code></td>
+ <td>
+ Specify the number of threads in the block pusher pool. These threads
assist in creating connections and pushing blocks to remote shuffle services.
By default, the threadpool size is equal to the number of spark executor cores.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxBlockSizeToPush</code></td>
+ <td><code>1m</code></td>
+ <td>
+ The max size of an individual block to push to the remote shuffle
services. Blocks larger than this threshold are not pushed to be merged
remotely. These shuffle blocks will be fetched by the executors in the original
manner.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxBlockBatchSize</code></td>
+ <td><code>3m</code></td>
+ <td>
+ The max size of a batch of shuffle blocks to be grouped into a single push
request.
Review comment:
There is additional comment in the code for
spark.shuffle.push.maxBlockBatchSize - is it (or some subset of it) relevant to
be documented here ?
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+ <td><code>0.05</code></td>
+ <td>
+ The minimum number of shuffle merger locations required to enable push
based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage. For example, a reduce stage which has 100
partitions and uses the default value 0.05 requires at least 5 unique merger
locations to enable push based shuffle. Merger locations are currently defined
as external shuffle services.
Review comment:
We can remove `Merger locations are currently defined as external
shuffle services.` ? That is already mentioned above.
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
Review comment:
`When the reduce tasks start running, they fetch a combination of the
merged shuffle partitions and some of the original shuffle blocks to get their
input data. As a result, push-based shuffle converts shuffle services’ small
random disk reads ` ->
`Reduce tasks fetch a combination of merged shuffle partitions and original
shuffle blocks as their input data, resulting in converting small random disk
reads by shuffle services into`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
Review comment:
`Push-based shuffle is an improved shuffle architecture that optimizes
the reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push ...` ->
`Push based shuffle helps improve the reliability and performance of spark
shuffle. It takes a best-effort approach to push ...`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
Review comment:
`The reduce tasks are also scheduled with locality preferences of the
locations of their corresponding merged shuffle partitions, which helps to
significantly improve shuffle fetch data locality.` ->
`Possibility of better data locality for reduce tasks additionally helps
minimize network IO.`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.merge.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ Specify the amount of time DAGScheduler waits after all mappers finish for
a given shuffle map stage before it starts sending merge finalize requests to
remote shuffle services. This allows the shuffle services some extra time to
merge as many blocks as possible.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of shuffle push merger locations cached for push based
shuffle. Currently, shuffle push merger locations are nothing but external
shuffle services which are responsible for handling pushed blocks and merging
them and serving merged blocks for later shuffle fetch.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+ <td><code>0.05</code></td>
+ <td>
+ The minimum number of shuffle merger locations required to enable push
based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage. For example, a reduce stage which has 100
partitions and uses the default value 0.05 requires at least 5 unique merger
locations to enable push based shuffle. Merger locations are currently defined
as external shuffle services.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinStaticThreshold</code></td>
+ <td><code>5</code></td>
+ <td>
+ The static threshold for number of shuffle push merger locations should be
available in order to enable push based shuffle for a stage. Note this config
works in conjunction with
<code>spark.shuffle.push.mergersMinThresholdRatio</code>. Maximum of
<code>spark.shuffle.push.mergersMinStaticThreshold</code> and
<code>spark.shuffle.push.mergersMinThresholdRatio</code> ratio number of
mergers needed to enable push based shuffle for a stage. For eg: with 1000
partitions for the child stage with
spark.shuffle.push.mergersMinStaticThreshold as 5 and
spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need at least
50 mergers to enable push based shuffle for that stage.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.numPushThreads</code></td>
+ <td><code>none</code></td>
+ <td>
+ Specify the number of threads in the block pusher pool. These threads
assist in creating connections and pushing blocks to remote shuffle services.
By default, the threadpool size is equal to the number of spark executor cores.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxBlockSizeToPush</code></td>
+ <td><code>1m</code></td>
+ <td>
+ The max size of an individual block to push to the remote shuffle
services. Blocks larger than this threshold are not pushed to be merged
remotely. These shuffle blocks will be fetched by the executors in the original
manner.
Review comment:
Also, address similar for chunk size as well.
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
Review comment:
We can remove the suffix "which needs to be set with the appropriate
org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for
push-based shuffle to be enabled"
It is documented for that flag above.
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
shuffle service. Instead, the shuffle service serves the merged file in <code>
MB-sized chunks </code>. This configuration controls how big a chunk can get. A
corresponding index file for each merged shuffle file will be generated
indicating chunk boundaries.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The size of cache in memory which is used in push-based shuffle for
storing merged index files. This cache is in addition to the one configured via
<code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.server.ioExceptionsThresholdDuringMerge</code></td>
+ <td><code>4</code></td>
+ <td>
+ The threshold for number of IOExceptions while merging shuffle blocks to a
shuffle partition. When the number of IOExceptions while writing to the merged
shuffle data/index/meta files exceed this threshold then the shuffle server
will stop merging shuffle blocks for this shuffle partition.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to 'true' to enable push-based shuffle on the client side and this
works in conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl which needs to be set with
the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
Review comment:
` and this works` -> `and works`
##########
File path: docs/configuration.md
##########
@@ -3134,3 +3134,119 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one ResourceProfile per stage. If the user associates
more then 1 ResourceProfile to an RDD, Spark will throw an exception by
default. See config `spark.scheduler.resource.profileMergeConflicts` to control
that behavior. The current merge strategy Spark implements when
`spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of
each resource within the conflicting ResourceProfiles. Spark will create a new
ResourceProfile with the max of each of the resources.
+
+# Push-based shuffle overview
+
+Push-based shuffle is an improved shuffle architecture that optimizes the
reliability and performance of the shuffle step in Spark. Complementing the
existing shuffle mechanism, push-based shuffle takes a best-effort approach to
push the shuffle blocks generated by the map tasks to remote shuffle services
to be merged per shuffle partition. When the reduce tasks start running, they
fetch a combination of the merged shuffle partitions and some of the original
shuffle blocks to get their input data. As a result, push-based shuffle
converts shuffle services’ small random disk reads into large sequential reads.
The reduce tasks are also scheduled with locality preferences of the locations
of their corresponding merged shuffle partitions, which helps to significantly
improve shuffle fetch data locality.
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN.
</b></p>
+
+### Shuffle server side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+
<code>org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of MergedShuffleFileManager that manages
push-based shuffle. This acts as a server side config to disable or enable
push-based shuffle. By default, push-based shuffle is disabled at the server
side, which does not perform push-based shuffle irrespective of the client side
configuration. To turn on push-based shuffle at the server side, set the
configuration to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code>
+ </td>
Review comment:
`By default, push-based shuffle is disabled at the server side, which
does not perform push-based shuffle irrespective of the client side
configuration` ->
`Currently, push-based shuffle is disabled by default at the server side.`
I am not very sure whether we need the suffix - irrespective of client part.
Thoughts ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]