HeartSaVioR commented on a change in pull request #31590:
URL: https://github.com/apache/spark/pull/31590#discussion_r579061128
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -712,6 +712,8 @@ csvDF <- read.stream("csv", path = "/path/to/directory",
schema = schema, sep =
These examples generate streaming DataFrames that are untyped, meaning that
the schema of the DataFrame is not checked at compile time, only checked at
runtime when the query is submitted. Some operations like `map`, `flatMap`,
etc. need the type to be known at compile time. To do those, you can convert
these untyped streaming DataFrames to typed streaming Datasets using the same
methods as static DataFrame. See the [SQL Programming
Guide](sql-programming-guide.html) for more details. Additionally, more details
on the supported streaming sources are discussed later in the document.
+Alternatively, since Spark 3.1, you can create streaming DataFrames with
`DataStreamReader.table()`. See [Streaming Table APIs](#streaming-table-apis)
for more details.
Review comment:
`Alternatively` feels me that the API is alternative one to read the
same, whereas the purpose is different. (data source vs table) We could simply
remove it, and add the purpose of the API in the sentence. Probably like below:
> Since Spark 3.1, you can also create streaming DataFrames from table with
`DataStreamReader.table()`. See [Streaming Table APIs](#streaming-table-apis)
for more details.
or something clearer?
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -2365,6 +2367,117 @@ When the streaming query is started, Spark calls the
function or the object’s
See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for
more details.
If you need deduplication on output, try out `foreachBatch` instead.
+#### Streaming Table APIs
+Since Spark 3.1, you can also use `DataStreamReader.table()` and
`DataStreamWriter.toTable()` to read and write streaming DataFrames as tables:
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val spark: SparkSession = ...
+
+// Create a streaming DataFrame
+val df = spark.readStream
+ .format("rate")
+ .option("rowsPerSecond", 10)
+ .load()
+
+// Write the streaming DataFrame to a table
+df.writeStream
+ .option("checkpointLocation", "path/to/checkpoint/dir")
+ .toTable("myTable")
+
+// Check the table result
+spark.read.table("myTable").show()
+
+// Transform the source dataset and write to a new table
+spark.readStream
+ .table("myTable")
+ .select("value")
+ .writeStream
+ .option("checkpointLocation", "path/to/checkpoint/dir")
+ .format("parquet")
+ .toTable("newTable")
+
+// Check the new table result
+spark.read.table("newTable").show()
+{% endhighlight %}
+
+</div>
+
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+SparkSession spark = ...
+
+// Create a streaming DataFrame
+Dataset<Row> df = spark.readStream()
+ .format("rate")
+ .option("rowsPerSecond", 10)
+ .load();
+
+// Write the streaming DataFrame to a table
+df.writeStream()
+ .option("checkpointLocation", "path/to/checkpoint/dir")
+ .toTable("myTable");
+
+// Check the table result
+spark.read().table("myTable").show();
+
+// Transform the source dataset and write to a new table
+spark.readStream()
+ .table("myTable")
+ .select("value")
+ .writeStream()
+ .option("checkpointLocation", "path/to/checkpoint/dir")
+ .format("parquet")
+ .toTable("newTable");
+
+// Check the new table result
+spark.read().table("newTable").show();
+{% endhighlight %}
+
+</div>
+
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+spark = ... # spark session
+
+# Create a streaming DataFrame
+df = spark.readStream \
+ .format("rate") \
+ .option("rowsPerSecond", 10) \
+ .load()
+
+# Write the streaming DataFrame to a table
+df.writeStream \
+ .option("checkpointLocation", "path/to/checkpoint/dir") \
+ .toTable("myTable")
+
+# Check the table result
+spark.read.table("myTable").show()
+
+# Transform the source dataset and write to a new table
+spark.readStream \
+ .table("myTable") \
+ .select("value") \
+ .writeStream \
+ .option("checkpointLocation", "path/to/checkpoint/dir") \
+ .format("parquet") \
+ .toTable("newTable")
+
+// Check the new table result
+spark.read.table("newTable").show()
+{% endhighlight %}
+
+</div>
+
+<div data-lang="r" markdown="1">
+Not available in R.
+</div>
+</div>
Review comment:
While the code example looks to describe the simple cases, I'd like to
guide to the end users they need to check the documentation of API for details
before using. A simple sentence is OK, just remind to check the documentation,
because the auto-creation of table has lacks on v2 table.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]