xuanyuanking commented on a change in pull request #31590:
URL: https://github.com/apache/spark/pull/31590#discussion_r579052287



##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -2365,6 +2367,117 @@ When the streaming query is started, Spark calls the 
function or the object’s
   See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for 
more details.
   If you need deduplication on output, try out `foreachBatch` instead.
 
+#### Streaming Table APIs
+Since Spark 3.1, you can also use `DataStreamReader.table()` and 
`DataStreamWriter.toTable()` to read and write streaming DataFrames as tables:
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val spark: SparkSession = ...
+
+// Create a streaming DataFrame
+val df = spark.readStream
+  .format("rate")
+  .option("rowsPerSecond", 10)
+  .load()
+
+// Write the streaming DataFrame to a table
+df.writeStream
+  .option("checkpointLocation", "path/to/checkpoint/dir")
+  .toTable("myTable")
+
+// Check the table result
+spark.read.table("myTable").show()
+
+// Transform the source dataset and write to a new table
+spark.readStream
+  .table("myTable")
+  .select("value")
+  .writeStream
+  .option("checkpointLocation", "path/to/checkpoint/dir")
+  .format("parquet")
+  .toTable("newTable")
+
+// Check the new table result
+spark.read.table("newTable").show()
+{% endhighlight %}
+
+</div>
+
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+SparkSession spark = ...
+
+// Create a streaming DataFrame
+Dataset<Row> df = spark.readStream()
+  .format("rate")
+  .option("rowsPerSecond", 10)
+  .load();
+
+// Write the streaming DataFrame to a table
+df.writeStream()
+  .option("checkpointLocation", "path/to/checkpoint/dir")
+  .toTable("myTable");
+
+// Check the table result
+spark.read().table("myTable").show();
+
+// Transform the source dataset and write to a new table
+spark.readStream()
+  .table("myTable")
+  .select("value")
+  .writeStream()
+  .option("checkpointLocation", "path/to/checkpoint/dir")
+  .format("parquet")
+  .toTable("newTable");
+
+// Check the new table result
+spark.read().table("newTable").show();
+{% endhighlight %}
+
+</div>
+
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+spark = ...  # spark session
+
+# Create a streaming DataFrame
+df = spark.readStream \
+    .format("rate") \
+    .option("rowsPerSecond", 10) \
+    .load()
+
+# Write the streaming DataFrame to a table
+df.writeStream \
+    .option("checkpointLocation", "path/to/checkpoint/dir") \
+    .toTable("myTable")
+
+# Check the table result
+spark.read.table("myTable").show()
+
+# Transform the source dataset and write to a new table
+spark.readStream \
+    .table("myTable") \
+    .select("value") \
+    .writeStream \
+    .option("checkpointLocation", "path/to/checkpoint/dir") \
+    .format("parquet") \
+    .toTable("newTable")
+
+// Check the new table result
+spark.read.table("newTable").show()
+{% endhighlight %}
+
+</div>
+
+<div data-lang="r"  markdown="1">
+Not available in R.

Review comment:
       Never mind, I saw other places also list the `R` tab even though it's 
not available.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to