Repository: spark
Updated Branches:
  refs/heads/master 7cb3f5479 -> 15c03e1e0


[SPARK-4140] Document dynamic allocation

Once the external shuffle service is also documented, the dynamic allocation 
section will link to it. Let me know if the whole dynamic allocation should be 
moved to its separate page; I personally think the organization might be 
cleaner that way.

This patch builds on top of oza's work in #3689.

aarondav pwendell

Author: Andrew Or <and...@databricks.com>
Author: Tsuyoshi Ozawa <ozawa.tsuyo...@gmail.com>

Closes #3731 from andrewor14/document-dynamic-allocation and squashes the 
following commits:

1281447 [Andrew Or] Address a few comments
b9843f2 [Andrew Or] Document the configs as well
246fb44 [Andrew Or] Merge branch 'SPARK-4839' of github.com:oza/spark into 
document-dynamic-allocation
8c64004 [Andrew Or] Add documentation for dynamic allocation (without configs)
6827b56 [Tsuyoshi Ozawa] Fixing a documentation of 
spark.dynamicAllocation.enabled.
53cff58 [Tsuyoshi Ozawa] Adding a documentation about dynamic resource 
allocation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15c03e1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15c03e1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15c03e1e

Branch: refs/heads/master
Commit: 15c03e1e0efac29855f32984da7c6b0321f0e37a
Parents: 7cb3f54
Author: Andrew Or <and...@databricks.com>
Authored: Fri Dec 19 19:36:20 2014 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Dec 19 19:36:20 2014 -0800

----------------------------------------------------------------------
 docs/configuration.md  |  61 +++++++++++++++++++++++++
 docs/job-scheduling.md | 108 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 169 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15c03e1e/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 64aa94f..2c8dea8 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1008,6 +1008,67 @@ Apart from these, the following properties are also 
available, and may be useful
 </tr>
 </table>
 
+#### Dynamic allocation
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+  <td><code>spark.dynamicAllocation.enabled</code></td>
+  <td>false</td>
+  <td>
+    Whether to use dynamic resource allocation, which scales the number of 
executors registered
+    with this application up and down based on the workload. Note that this is 
currently only
+    available on YARN mode. For more detail, see the description
+    <a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
+    <br><br>
+    This requires the following configurations to be set:
+    <code>spark.dynamicAllocation.minExecutors</code>,
+    <code>spark.dynamicAllocation.maxExecutors</code>, and
+    <code>spark.shuffle.service.enabled</code>
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.minExecutors</code></td>
+  <td>(none)</td>
+  <td>
+    Lower bound for the number of executors if dynamic allocation is enabled 
(required).
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.maxExecutors</code></td>
+  <td>(none)</td>
+  <td>
+    Upper bound for the number of executors if dynamic allocation is enabled 
(required).
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
+  <td>60</td>
+  <td>
+    If dynamic allocation is enabled and there have been pending tasks 
backlogged for more than
+    this duration (in seconds), new executors will be requested. For more 
detail, see this
+    <a href="job-scheduling.html#resource-allocation-policy">description</a>.
+  </td>
+</tr>
+<tr>
+  
<td><code>spark.dynamicAllocation.sustainedSchedulerBacklogTimeout</code></td>
+  <td><code>schedulerBacklogTimeout</code></td>
+  <td>
+    Same as <code>spark.dynamicAllocation.schedulerBacklogTimeout</code>, but 
used only for
+    subsequent executor requests. For more detail, see this
+    <a href="job-scheduling.html#resource-allocation-policy">description</a>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
+  <td>600</td>
+  <td>
+    If dynamic allocation is enabled and an executor has been idle for more 
than this duration
+    (in seconds), the executor will be removed. For more detail, see this
+    <a href="job-scheduling.html#resource-allocation-policy">description</a>.
+  </td>
+</tr>
+</table>
+
 #### Security
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/15c03e1e/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 94604f3..dfbb871 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -56,6 +56,114 @@ the same RDDs. For example, the 
[Shark](http://shark.cs.berkeley.edu) JDBC serve
 queries. In future releases, in-memory storage systems such as 
[Tachyon](http://tachyon-project.org) will
 provide another approach to share RDDs.
 
+## Dynamic Resource Allocation
+
+Spark 1.2 introduces the ability to dynamically scale the set of cluster 
resources allocated to
+your application up and down based on the workload. This means that your 
application may give
+resources back to the cluster if they are no longer used and request them 
again later when there
+is demand. This feature is particularly useful if multiple applications share 
resources in your
+Spark cluster. If a subset of the resources allocated to an application 
becomes idle, it can be
+returned to the cluster's pool of resources and acquired by other 
applications. In Spark, dynamic
+resource allocation is performed on the granularity of the executor and can be 
enabled through
+`spark.dynamicAllocation.enabled`.
+
+This feature is currently disabled by default and available only on 
[YARN](running-on-yarn.html).
+A future release will extend this to [standalone mode](spark-standalone.html) 
and
+[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that 
although Spark on
+Mesos already has a similar notion of dynamic resource sharing in fine-grained 
mode, enabling
+dynamic allocation allows your Mesos application to take advantage of 
coarse-grained low-latency
+scheduling while sharing cluster resources efficiently.
+
+### Configuration and Setup
+
+All configurations used by this feature live under the 
`spark.dynamicAllocation.*` namespace.
+To enable this feature, your application must set 
`spark.dynamicAllocation.enabled` to `true` and
+provide lower and upper bounds for the number of executors through
+`spark.dynamicAllocation.minExecutors` and 
`spark.dynamicAllocation.maxExecutors`. Other relevant
+configurations are described on the [configurations 
page](configuration.html#dynamic-allocation)
+and in the subsequent sections in detail.
+
+Additionally, your application must use an external shuffle service. The 
purpose of the service is
+to preserve the shuffle files written by executors so the executors can be 
safely removed (more
+detail described 
[below](job-scheduling.html#graceful-decommission-of-executors)). To enable
+this service, set `spark.shuffle.service.enabled` to `true`. In YARN, this 
external shuffle service
+is implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs 
in each `NodeManager`
+in your cluster. To start this service, follow these steps:
+
+1. Build Spark with the [YARN profile](building-spark.html). Skip this step if 
you are using a
+pre-packaged distribution.
+2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
+`$SPARK_HOME/network/yarn/target/scala-<version>` if you are building Spark 
yourself, and under
+`lib` if you are using a distribution.
+2. Add this jar to the classpath of all `NodeManager`s in your cluster.
+3. In the `yarn-site.xml` on each node, add `spark_shuffle` to 
`yarn.nodemanager.aux-services`,
+then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
+`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all 
relevant
+`spark.shuffle.service.*` [configurations](configuration.html).
+4. Restart all `NodeManager`s in your cluster.
+
+### Resource Allocation Policy
+
+At a high level, Spark should relinquish executors when they are no longer 
used and acquire
+executors when they are needed. Since there is no definitive way to predict 
whether an executor
+that is about to be removed will run a task in the near future, or whether a 
new executor that is
+about to be added will actually be idle, we need a set of heuristics to 
determine when to remove
+and request executors.
+
+#### Request Policy
+
+A Spark application with dynamic allocation enabled requests additional 
executors when it has
+pending tasks waiting to be scheduled. This condition necessarily implies that 
the existing set
+of executors is insufficient to simultaneously saturate all tasks that have 
been submitted but
+not yet finished.
+
+Spark requests executors in rounds. The actual request is triggered when there 
have been pending
+tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then 
triggered again
+every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds 
thereafter if the queue
+of pending tasks persists. Additionally, the number of executors requested in 
each round increases
+exponentially from the previous round. For instance, an application will add 1 
executor in the
+first round, and then 2, 4, 8 and so on executors in the subsequent rounds.
+
+The motivation for an exponential increase policy is twofold. First, an 
application should request
+executors cautiously in the beginning in case it turns out that only a few 
additional executors is
+sufficient. This echoes the justification for TCP slow start. Second, the 
application should be
+able to ramp up its resource usage in a timely manner in case it turns out 
that many executors are
+actually needed.
+
+#### Remove Policy
+
+The policy for removing executors is much simpler. A Spark application removes 
an executor when
+it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` 
seconds. Note that,
+under most circumstances, this condition is mutually exclusive with the 
request condition, in that
+an executor should not be idle if there are still pending tasks to be 
scheduled.
+
+### Graceful Decommission of Executors
+
+Before dynamic allocation, a Spark executor exits either on failure or when 
the associated
+application has also exited. In both scenarios, all state associated with the 
executor is no
+longer needed and can be safely discarded. With dynamic allocation, however, 
the application
+is still running when an executor is explicitly removed. If the application 
attempts to access
+state stored in or written by the executor, it will have to perform a 
recompute the state. Thus,
+Spark needs a mechanism to decommission an executor gracefully by preserving 
its state before
+removing it.
+
+This requirement is especially important for shuffles. During a shuffle, the 
Spark executor first
+writes its own map outputs locally to disk, and then acts as the server for 
those files when other
+executors attempt to fetch them. In the event of stragglers, which are tasks 
that run for much
+longer than their peers, dynamic allocation may remove an executor before the 
shuffle completes,
+in which case the shuffle files written by that executor must be recomputed 
unnecessarily.
+
+The solution for preserving shuffle files is to use an external shuffle 
service, also introduced
+in Spark 1.2. This service refers to a long-running process that runs on each 
node of your cluster
+independently of your Spark applications and their executors. If the service 
is enabled, Spark
+executors will fetch shuffle files from the service instead of from each 
other. This means any
+shuffle state written by an executor may continue to be served beyond the 
executor's lifetime.
+
+In addition to writing shuffle files, executors also cache data either on disk 
or in memory.
+When an executor is removed, however, all cached data will no longer be 
accessible. There is
+currently not yet a solution for this in Spark 1.2. In future releases, the 
cached data may be
+preserved through an off-heap storage similar in spirit to how shuffle files 
are preserved through
+the external shuffle service.
 
 # Scheduling Within an Application
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to