HeartSaVioR opened a new pull request, #36963:
URL: https://github.com/apache/spark/pull/36963

   ### What changes were proposed in this pull request?
   
   This PR proposes to expose the information of catalog table (V1/V2) to the 
logical plan in streaming query, specifically, parsed plan and analyzed plan. 
(We may discard some information in optimized plan.)
   
   The major change is to propagate the information of catalog table from the 
place we resolve the table to the place we execute the query. In MicroBatch 
execution, we have several transformations on the logical plan which replace 
the node with another node, hence this PR touches multiple logical nodes which 
the code path passes through.
   
   Specifically for DSv1 sink, we don't have a specific write logical node, 
hence it's not feasible to expose the information for the destination. This PR 
introduces `WriteToMicroBatchDataSourceV1` which is DSv1 version of 
`WriteToMicroBatchDataSource` as a logical node for DSv1 sink. Worth noting 
that `WriteToMicroBatchDataSourceV1` plays as a marker - we eliminate this node 
in streaming specific optimization phase.
   
   ### Why are the changes needed?
   
   This PR give a better UX to end users who use table API for streaming query. 
Previously it's not easy or even not feasible to check which tables are being 
read and written from the streaming query. Most likely it requires end users to 
look into their code/query.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, in parsed/analyzed plan, we now expose the table information into the 
read/write logical node. Specifically for DSv1, we introduce a marker write 
node to expose the information for destination without majorly changing 
existing logic.
   
   > DSv1 read and write
   
   >> Before the patch
   
   <img width="1635" alt="original-read-from-dsv1-write-to-dsv1" 
src="https://user-images.githubusercontent.com/1317309/175210731-dcc4cc4d-a70b-467d-b577-79c20600db32.png";>
   
   >> After the patch
   
   <img width="1716" alt="proposal-read-from-dsv1-write-to-dsv1" 
src="https://user-images.githubusercontent.com/1317309/175210753-8f3c0b81-6ec5-45df-a2f8-37589960bba2.png";>
   
   > DSv2 read and write
   
   >> Before the patch
   
   <img width="1684" alt="original-read-from-dsv2-write-to-dsv2" 
src="https://user-images.githubusercontent.com/1317309/175210780-4a99c670-8a42-4511-959c-cafe0c24bc00.png";>
   
   >> After the patch
   
   <img width="1755" alt="proposal-read-from-dsv2-write-to-dsv2" 
src="https://user-images.githubusercontent.com/1317309/175210807-2b0b6935-e696-4051-b1a2-725d784d9d5f.png";>
   
   ### How was this patch tested?
   
   New test cases. Also manually tested via running following query and checked 
the UI page:
   
   > DSv1 read and write
   
   ```
   /*
   ./bin/spark-shell --conf "spark.sql.ui.explainMode=extended"
   */
   
   spark.sql("drop table if exists stream_source")
   
   spark.sql("drop table if exists stream_target")
   
   spark.sql("create table stream_source (col1 string, col2 int) using parquet")
   
   spark.sql("create table stream_target (col1 string, col2 int) using parquet")
   
   val checkpointDir = java.nio.file.Files.createTempDirectory("checkpoint-")
   
   val q = 
spark.readStream.table("stream_source").writeStream.format("parquet").option("checkpointLocation",
 checkpointDir.toString).toTable("stream_target")
   
   Thread.sleep(10000)
   
   spark.sql("insert into stream_source values ('a', 1)")
   spark.sql("insert into stream_source values ('a', 2)")
   spark.sql("insert into stream_source values ('a', 3)")
   
   q.processAllAvailable()
   
   spark.sql("insert into stream_source values ('b', 1)")
   spark.sql("insert into stream_source values ('b', 2)")
   spark.sql("insert into stream_source values ('b', 3)")
   
   q.processAllAvailable()
   
   spark.sql("insert into stream_source values ('c', 1)")
   spark.sql("insert into stream_source values ('c', 2)")
   spark.sql("insert into stream_source values ('c', 3)")
   
   q.processAllAvailable()
   
   q.stop()
   ```
   
   > DSv2 read and write
   
   ```
   /*
   ./bin/spark-shell --packages 
org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1\
       --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.local.type=hadoop \
       --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
       --conf spark.sql.ui.explainMode=extended
   */
   
   
   spark.sql("drop table if exists local.db.stream_target")
   
   spark.sql("create table local.db.stream_source (col1 string, col2 int) using 
iceberg")
   
   spark.sql("create table local.db.stream_target (col1 string, col2 int) using 
iceberg")
   
   val checkpointDir = java.nio.file.Files.createTempDirectory("checkpoint-")
   
   val q = 
spark.readStream.table("local.db.stream_source").writeStream.format("iceberg").option("checkpointLocation",
 checkpointDir.toString).toTable("local.db.stream_target")
   
   Thread.sleep(10000)
   
   spark.sql("insert into local.db.stream_source values ('a', 1)")
   spark.sql("insert into local.db.stream_source values ('a', 2)")
   spark.sql("insert into local.db.stream_source values ('a', 3)")
   
   q.processAllAvailable()
   
   spark.sql("insert into local.db.stream_source values ('b', 1)")
   spark.sql("insert into local.db.stream_source values ('b', 2)")
   spark.sql("insert into local.db.stream_source values ('b', 3)")
   
   q.processAllAvailable()
   
   spark.sql("insert into local.db.stream_source values ('c', 1)")
   spark.sql("insert into local.db.stream_source values ('c', 2)")
   spark.sql("insert into local.db.stream_source values ('c', 3)")
   
   q.processAllAvailable()
   
   q.stop()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to