xuanyuanking commented on a change in pull request #31590: URL: https://github.com/apache/spark/pull/31590#discussion_r579049336
########## File path: docs/structured-streaming-programming-guide.md ########## @@ -2365,6 +2367,117 @@ When the streaming query is started, Spark calls the function or the object’s See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. If you need deduplication on output, try out `foreachBatch` instead. +#### Streaming Table APIs +Since Spark 3.1, you can also use `DataStreamReader.table()` and `DataStreamWriter.toTable()` to read and write streaming DataFrames as tables: + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val spark: SparkSession = ... + +// Create a streaming DataFrame +val df = spark.readStream + .format("rate") + .option("rowsPerSecond", 10) + .load() + +// Write the streaming DataFrame to a table +df.writeStream + .option("checkpointLocation", "path/to/checkpoint/dir") + .toTable("myTable") + +// Check the table result +spark.read.table("myTable").show() + +// Transform the source dataset and write to a new table +spark.readStream + .table("myTable") + .select("value") + .writeStream + .option("checkpointLocation", "path/to/checkpoint/dir") + .format("parquet") + .toTable("newTable") + +// Check the new table result +spark.read.table("newTable").show() +{% endhighlight %} + +</div> + +<div data-lang="java" markdown="1"> + +{% highlight java %} +SparkSession spark = ... + +// Create a streaming DataFrame +Dataset<Row> df = spark.readStream() + .format("rate") + .option("rowsPerSecond", 10) + .load(); + +// Write the streaming DataFrame to a table +df.writeStream() + .option("checkpointLocation", "path/to/checkpoint/dir") + .toTable("myTable"); + +// Check the table result +spark.read().table("myTable").show(); + +// Transform the source dataset and write to a new table +spark.readStream() + .table("myTable") + .select("value") + .writeStream() + .option("checkpointLocation", "path/to/checkpoint/dir") + .format("parquet") + .toTable("newTable"); + +// Check the new table result +spark.read().table("newTable").show(); +{% endhighlight %} + +</div> + +<div data-lang="python" markdown="1"> + +{% highlight python %} +spark = ... # spark session + +# Create a streaming DataFrame +df = spark.readStream \ + .format("rate") \ + .option("rowsPerSecond", 10) \ + .load() + +# Write the streaming DataFrame to a table +df.writeStream \ + .option("checkpointLocation", "path/to/checkpoint/dir") \ + .toTable("myTable") + +# Check the table result +spark.read.table("myTable").show() + +# Transform the source dataset and write to a new table +spark.readStream \ + .table("myTable") \ + .select("value") \ + .writeStream \ + .option("checkpointLocation", "path/to/checkpoint/dir") \ + .format("parquet") \ + .toTable("newTable") + +// Check the new table result +spark.read.table("newTable").show() +{% endhighlight %} + +</div> + +<div data-lang="r" markdown="1"> +Not available in R. Review comment: Maybe we can remove the `R` tab. ########## File path: docs/structured-streaming-programming-guide.md ########## @@ -2365,6 +2367,117 @@ When the streaming query is started, Spark calls the function or the object’s See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. If you need deduplication on output, try out `foreachBatch` instead. +#### Streaming Table APIs +Since Spark 3.1, you can also use `DataStreamReader.table()` and `DataStreamWriter.toTable()` to read and write streaming DataFrames as tables: + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val spark: SparkSession = ... + +// Create a streaming DataFrame +val df = spark.readStream + .format("rate") + .option("rowsPerSecond", 10) + .load() + +// Write the streaming DataFrame to a table +df.writeStream + .option("checkpointLocation", "path/to/checkpoint/dir") + .toTable("myTable") + +// Check the table result +spark.read.table("myTable").show() + +// Transform the source dataset and write to a new table +spark.readStream + .table("myTable") + .select("value") + .writeStream + .option("checkpointLocation", "path/to/checkpoint/dir") + .format("parquet") + .toTable("newTable") + +// Check the new table result +spark.read.table("newTable").show() +{% endhighlight %} + +</div> + +<div data-lang="java" markdown="1"> + +{% highlight java %} +SparkSession spark = ... + +// Create a streaming DataFrame +Dataset<Row> df = spark.readStream() + .format("rate") + .option("rowsPerSecond", 10) + .load(); + +// Write the streaming DataFrame to a table +df.writeStream() + .option("checkpointLocation", "path/to/checkpoint/dir") + .toTable("myTable"); + +// Check the table result +spark.read().table("myTable").show(); + +// Transform the source dataset and write to a new table +spark.readStream() + .table("myTable") + .select("value") + .writeStream() + .option("checkpointLocation", "path/to/checkpoint/dir") + .format("parquet") + .toTable("newTable"); + +// Check the new table result +spark.read().table("newTable").show(); +{% endhighlight %} + +</div> + +<div data-lang="python" markdown="1"> + +{% highlight python %} +spark = ... # spark session + +# Create a streaming DataFrame +df = spark.readStream \ + .format("rate") \ + .option("rowsPerSecond", 10) \ + .load() + +# Write the streaming DataFrame to a table +df.writeStream \ + .option("checkpointLocation", "path/to/checkpoint/dir") \ + .toTable("myTable") + +# Check the table result +spark.read.table("myTable").show() + +# Transform the source dataset and write to a new table +spark.readStream \ + .table("myTable") \ + .select("value") \ + .writeStream \ + .option("checkpointLocation", "path/to/checkpoint/dir") \ + .format("parquet") \ + .toTable("newTable") + +// Check the new table result Review comment: Change "//" to "#" since this is still in the Python div? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
