Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222813906 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) </div> </div> -##### Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +##### Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +###### ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2<Dataset<String>, long> { + void call(Dataset<String> dataset, long batchId) { + // Transform and write batchDF + } + } +).start(); +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + +</div> +<div data-lang="r" markdown="1"> +R is not yet supported. +</div> +</div> + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + + streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() --- End diff -- `uncache()` -> `unpersist()`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org