[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22627


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223490770
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
+# Transform and write batchDF
+pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With `foreachBatch`, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
`foreachBatch`, you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.persist()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.unpersist()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using `foreachBatch`, you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223490811
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
+# Transform and write batchDF
+pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With `foreachBatch`, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
`foreachBatch`, you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.persist()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.unpersist()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using `foreachBatch`, you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223491087
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
--- End diff --

nit: `foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223491101
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
+# Transform and write batchDF
+pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
--- End diff --

nit: `foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223481016
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223479786
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
+
+  - Changes in projections with different output schema are conditionally 
allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` 
is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
+
+- *Changes in stateful operations*: Some operations in streaming queries 
need to maintain
+  state data in order to continuously update the result. Structured 
Streaming automatically checkpoints
+  the state data to fault-tolerant storage (for example, DBFS, AWS S3, 
Azure Blob storage) and restores it after restart.
--- End diff --

replaced with HDFS


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223479414
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223456294
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
--- End diff --

Right. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223456079
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
--- End diff --

Yes. I missed a few, and I want to fix them all. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894056
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
--- End diff --

`foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894110
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
--- End diff --

`epochId` -> `epoch_id`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893790
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894028
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
--- End diff --

4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893841
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893720
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893572
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222872973
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
--- End diff --

Not a big deal but methods like `foreachBatch` are sometimes rendered that 
way, and sometimes without code font like foreachBatch(). It's nice to 
back-tick-quote class and method names if you are doing another pass.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222840402
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222813906
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
--- End diff --

`uncache()` -> `unpersist()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815111
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815845
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222812874
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815770
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222812936
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
--- End diff --

nit: `writeStream()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815072
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222818441
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
--- End diff --

this example changes the schema. Right? From `string` to `struct`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222817163
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222814573
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222812757
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
--- End diff --

long -> Long. I noticed the current Java API actually is wrong. Submitted 
https://github.com/apache/spark/pull/22633 to fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222818615
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
+
+  - Changes in projections with different output schema are conditionally 
allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` 
is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
+
+- *Changes in stateful operations*: Some operations in streaming queries 
need to maintain
+  state data in order to continuously update the result. Structured 
Streaming automatically checkpoints
+  the state data to fault-tolerant storage (for example, DBFS, AWS S3, 
Azure Blob storage) and restores it after restart.
--- End diff --

remove `DBFS`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222818119
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
--- End diff --

nit: `path` -> `topic`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815808
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815830
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815089
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `ForeachWriter` 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/22627

[SPARK-25639] [DOCS] Added docs for foreachBatch, python foreach and 
multiple watermarks

## What changes were proposed in this pull request?

Added
- Python foreach
- Scala, Java and Python foreachBatch
- Multiple watermark policy
- The semantics of what changes are allowed to the streaming between 
restarts.

## How was this patch tested?
No tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-25639

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22627.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22627


commit f61c13ef0d4711a04b2774934641f7a4ac690165
Author: Tathagata Das 
Date:   2018-10-04T10:33:47Z

Added docs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org