Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3653#discussion_r21652039
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1238,49 +1333,260 @@ information on different persistence levels can be 
found in
     ***
     
     ## Checkpointing
    -A _stateful operation_ is one which operates over multiple batches of 
data. This includes all
    -window-based operations and the `updateStateByKey` operation. Since 
stateful operations have a
    -dependency on previous batches of data, they continuously accumulate 
metadata over time.
    -To clear this metadata, streaming supports periodic _checkpointing_ by 
saving intermediate data
    -to HDFS. Note that checkpointing also incurs the cost of saving to HDFS 
which may cause the
    -corresponding batch to take longer to process. Hence, the interval of 
checkpointing needs to be
    -set carefully. At small batch sizes (say 1 second), checkpointing every 
batch may significantly
    -reduce operation throughput. Conversely, checkpointing too slowly causes 
the lineage and task
    -sizes to grow which may have detrimental effects. Typically, a checkpoint 
interval of 5 - 10
    -times of sliding interval of a DStream is good setting to try.
    -
    -To enable checkpointing, the developer has to provide the HDFS path to 
which RDD will be saved.
    -This is done by using
    +A streaming application must operate 24/7 and hence must be resilient to 
failures unrelated
    +to the application logic (e.g., system failures, JVM crashes, etc.). For 
this to be possible,
    +Spark Streaming needs to *checkpoints* enough information to a fault-
    +tolerant storage system such that it can recover from failures. There are 
two types of data
    +that are checkpointed.
    +
    +- *Metadata checkpointing* - Saving of the information defining the 
streaming computation to
    +  fault-tolerant storage like HDFS. This is used to recover from failure 
of the node running the
    +  driver of the streaming application (discussed in detail later). 
Metadata includes:
    +  +  *Configuration* - The configuration that were used to create the 
streaming application.
    +  +  *DStream operations* - The set of DStream operations that define the 
streaming application.
    +  +  *Incomplete batches* - Batches whose jobs are queued but have not 
completed yet.
    +- *Data checkpointing* - Saving of the generated RDDs to reliable storage. 
This is necessary
    +  in some *stateful* transformations that combine data across multiple 
batches. In such
    +  transformations, the generated RDDs depends on RDDs of previous batches, 
which causes the length
    +  of the dependency chain to keep increasing with time. To avoid such 
unbounded increase in recovery
    +   time (proportional to dependency chain), intermediate RDDs of stateful 
transformations are periodically
    +  *checkpointed* to reliable storage (e.g. HDFS) to cut off the dependency 
chains.
    +
    +To summarize, metadata checkpointing is primarily needed for recovery from 
driver failures,
    +whereas data or RDD checkpointing is necessary even for basic functioning 
if stateful
    +transformations are used.
    +
    +#### When to enable Checkpointing
    +{:.no_toc}
    +
    +Checkpointing must be enabled for applications with any of the following 
requirements:
    +
    +- *Usage of stateful transformations* - If either `updateStateByKey` or 
`reduceByKeyAndWindow` (with
    +  inverse function) is used in the application, then the checkpoint 
directory must be provided for
    +  allowing periodic RDD checkpointing.
    +- *Recovering from failures of the driver running the application* - 
Metadata checkpoints are used
    +  for to recover with progress information.
    +
    +Note that simple streaming applications without the aforementioned 
stateful transformations can be
    +run without enabling checkpointing. The recovery from driver failures will 
also be partial in
    +that case (some received but unprocessed data may be lost). This is often 
acceptable and many run
    +Spark Streaming applications in this way. Support for non-Hadoop 
environments is expected
    +to improve in the future.
    +
    +#### How to configure Checkpointing
    +{:.no_toc}
    +
    +Checkpointing can be enabled by setting a directory in a fault-tolerant,
    +reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint 
information will be saved.
    +This is done by using `streamingContext.checkpoint(checkpointDirectory)`. 
This will allow you to
    +use the aforementioned stateful transformations. Additionally,
    +if you want make the application recover from driver failures, you should 
rewrite your
    +streaming application to have the following behavior.
    +
    +  + When the program is being started for the first time, it will create a 
new StreamingContext,
    +    set up all the streams and then call start().
    +  + When the program is being restarted after failure, it will re-create a 
StreamingContext
    +    from the checkpoint data in the checkpoint directory.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +
    +This behavior is made simple by using `StreamingContext.getOrCreate`. This 
is used as follows.
     
     {% highlight scala %}
    -ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or 
JavaStreamingContext
    +// Function to create and setup a new StreamingContext
    +def functionToCreateContext(): StreamingContext = {
    +    val ssc = new StreamingContext(...)   // new context
    +    val lines = ssc.socketTextStream(...) // create DStreams
    +    ...
    +    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    +    ssc
    +}
    +
    +// Get StreamingContext from checkpoint data or create a new one
    +val context = StreamingContext.getOrCreate(checkpointDirectory, 
functionToCreateContext _)
    +
    +// Do additional setup on context that needs to be done,
    +// irrespective of whether it is being started or restarted
    +context. ...
    +
    +// Start the context
    +context.start()
    +context.awaitTermination()
     {% endhighlight %}
     
    -The interval of checkpointing of a DStream can be set by using
    +If the `checkpointDirectory` exists, then the context will be recreated 
from the checkpoint data.
    +If the directory does not exist (i.e., running for the first time),
    +then the function `functionToCreateContext` will be called to create a new
    +context and set up the DStreams. See the Scala example
    
+[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
    +This example appends the word counts of network data into a file.
     
    -{% highlight scala %}
    -dstream.checkpoint(checkpointInterval)
    +</div>
    +<div data-lang="java" markdown="1">
    +
    +This behavior is made simple by using `JavaStreamingContext.getOrCreate`. 
This is used as follows.
    +
    +{% highlight java %}
    +// Create a factory object that can create a and setup a new 
JavaStreamingContext
    +JavaStreamingContextFactory contextFactory = new 
JavaStreamingContextFactory() {
    +  @Override public JavaStreamingContext create() {
    +    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new 
context
    +    JavaDStream<String> lines = jssc.socketTextStream(...);     // create 
DStreams
    +    ...
    +    jssc.checkpoint(checkpointDirectory);                       // set 
checkpoint directory
    +    return jssc;
    +  }
    +};
    +
    +// Get JavaStreamingContext from checkpoint data or create a new one
    +JavaStreamingContext context = 
JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
    +
    +// Do additional setup on context that needs to be done,
    +// irrespective of whether it is being started or restarted
    +context. ...
    +
    +// Start the context
    +context.start();
    +context.awaitTermination();
    +{% endhighlight %}
    +
    +If the `checkpointDirectory` exists, then the context will be recreated 
from the checkpoint data.
    +If the directory does not exist (i.e., running for the first time),
    +then the function `contextFactory` will be called to create a new
    +context and set up the DStreams. See the Scala example
    
+[JavaRecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
    +This example appends the word counts of network data into a file.
    +
    +</div>
    +<div data-lang="python" markdown="1">
    +
    +This behavior is made simple by using `StreamingContext.getOrCreate`. This 
is used as follows.
    +
    +{% highlight python %}
    +# Function to create and setup a new StreamingContext
    +def functionToCreateContext():
    +    sc = SparkContext(...)   # new context
    +    ssc = new StreamingContext(...)
    +    lines = ssc.socketTextStream(...) # create DStreams
    +    ...
    +    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
    +    return ssc
    +
    +# Get StreamingContext from checkpoint data or create a new one
    +context = StreamingContext.getOrCreate(checkpointDirectory, 
functionToCreateContext)
    +
    +# Do additional setup on context that needs to be done,
    +# irrespective of whether it is being started or restarted
    +context. ...
    +
    +# Start the context
    +context.start()
    +context.awaitTermination()
     {% endhighlight %}
     
    -For DStreams that must be checkpointed (that is, DStreams created by 
`updateStateByKey` and
    -`reduceByKeyAndWindow` with inverse function), the checkpoint interval of 
the DStream is by
    -default set to a multiple of the DStream's sliding interval such that its 
at least 10 seconds.
    +If the `checkpointDirectory` exists, then the context will be recreated 
from the checkpoint data.
    +If the directory does not exist (i.e., running for the first time),
    +then the function `functionToCreateContext` will be called to create a new
    +context and set up the DStreams. See the Python example
    
+[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
    +This example appends the word counts of network data into a file.
    +
    +You can also explicitly create a `StreamingContext` from the checkpoint 
data and start the
    + computation by using `StreamingContext.getOrCreate(checkpointDirectory, 
None)`.
    +
    +</div>
    +</div>
    +
    +In addition to using `getOrCreate` one also needs to ensure that the 
driver process gets
    +restarted automatically on failure. This can only be done by the 
deployment infrastructure that is
    +used to run the application. This is further discussed in the
    +[Deployment](#deploying-applications.html) section.
    +
    +Note that checkpointing of RDDs incurs the cost of saving to reliable 
storage.
    +This may cause an increase in the processing time of those batches where 
RDDs get checkpointed.
    +Hence, the interval of
    +checkpointing needs to be set carefully. At small batch sizes (say 1 
second), checkpointing every
    +batch may significantly reduce operation throughput. Conversely, 
checkpointing too infrequently
    +causes the lineage and task sizes to grow which may have detrimental 
effects. For stateful
    +transformations that require RDD checkpointing, the default interval is a 
multiple of the
    +batch interval that is at least 10 seconds. It can be set by using
    +`dstream.checkpoint(checkpointInterval)`. Typically, a checkpoint interval 
of 5 - 10 times of
    +sliding interval of a DStream is good setting to try.
     
     ***
     
     ## Deploying Applications
    -A Spark Streaming application is deployed on a cluster in the same way as 
any other Spark application.
    -Please refer to the [deployment guide](cluster-overview.html) for more 
details.
    +This section discusses the steps to deploy a Spark Streaming application.
     
    -Note that the applications
    -that use [advanced sources](#advanced-sources) (e.g. Kafka, Flume, 
Twitter) are also required to package the
    -extra artifact they link to, along with their dependencies, in the JAR 
that is used to deploy the application.
    -For example, an application using `TwitterUtils` will have to include
    -`spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and all its 
transitive
    -dependencies in the application JAR.
    +### Requirements
    +{:.no_toc}
     
    -If a running Spark Streaming application needs to be upgraded (with new 
application code), then
    -there are two possible mechanism.
    +To run a Spark Streaming applications, you need to have the following.
    +
    +- *Cluster with a cluster manager* - This is the general requirement of 
any Spark application,
    +  and discussed in detail in the [deployment guide](cluster-overview.html).
    +
    +- *Package the application JAR* - You have to compile your streaming 
application into a JAR.
    +  If you are using [`spark-submit`](submitting-applications.html) to start 
the
    +  application, then you will not need to provide Spark and Spark Streaming 
in the JAR. However,
    +  if your application uses [advanced sources](#advanced-sources) (e.g. 
Kafka, Flume, Twitter),
    +  then you will have to package the extra artifact they link to, along 
with their dependencies,
    +  in the JAR that is used to deploy the application. For example, an 
application using `TwitterUtils`
    +  will have to include 
`spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and all its
    +  transitive dependencies in the application JAR.
    +
    +- *Configuring sufficient memory for the executors* - Since the received 
data must be stored in
    +  memory, the executors must be configured with sufficient memory to hold 
the received data. Note
    +  that if you are doing 10 minute window operations, the system has to 
keep at least last 10 minutes
    +  of data in memory. So the memory requirements for the application 
depends on the operations
    +  used in it.
    +
    +- *Configuring checkpointing* - If the stream application requires it, 
then a directory in the
    +  Hadoop API compatible fault-tolerant storage (e.g. HDFS, S3, etc.) must 
be configured as the
    +  checkpoint directory and the streaming application written in a way that 
checkpoint
    +  information can be used for failure recovery. See the 
[checkpointing](#checkpointing) section
    +  for more details.
    +
    +- *Configuring automatic restart of the application driver* - To 
automatically recover from a
    +  driver failure, the deployment infrastructure that is
    +  used to run the streaming application must monitor the driver process 
and relaunch the driver
    +  if it fails. Different [cluster 
managers](cluster-overview.html#cluster-manager-types)
    +  have different tools to achieve this.
    +    + *Spark Standalone* - A Spark application driver can be submitted to 
run within the Spark
    +      Standalone cluster (see
    +      [cluster deploy 
mode](spark-standalone.html#launching-spark-applications)), that is, the
    +      application driver itself runs on one of the worker nodes. 
Furthermore, the
    +      Standalone cluster manager can be instructed to *supervise* the 
driver,
    +      and relaunch it if the driver fails either due to non-zero exit code,
    +      or due to failure of the node running the driver. See *cluster mode* 
and *supervise* in the
    +      [Spark Standalone guide](spark-standalone.html) for more details.
    +    + *YARN* - Yarn supports a similar mechanism for automatically 
restarting an application.
    +      Please refer to YARN documentation for more details.
    +    + *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has 
been used to achieve this
    +      with Mesos.
    +
    +
    +- *[Experimental in Spark 1.2] Configuring write ahead logs* - In Spark 
1.2,
    +  we have introduced a new experimental feature of write ahead logs for 
achieved strong
    +  fault-tolerance guarantees. If enabled,  all the data received from a 
receiver gets written into
    +  a write ahead log in the configuration checkpoint directory. This 
prevents data loss on driver
    +  recovery, thus ensuring zero data loss (discussed in detail in the
    +  [Fault-tolerance Semantics](#fault-tolerance-semantics) section). This 
can be enabled by setting
    +  the [configuration parameter](configuration.html#spark-streaming)
    +  `spark.streaming.receiver.writeAheadLogs.enable` to `true`. However, 
this stronger semantics may
    +  come at the cost of the receiving throughput of individual receivers. 
can be corrected by running
    --- End diff --
    
    Typo here.  Looks like maybe a word was dropped at the start of this next 
sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to