Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222813906
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`
    
-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
    -which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
    +logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
    +and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    --- End diff --
    
    `uncache()` -> `unpersist()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to