HeartSaVioR opened a new pull request, #36963: URL: https://github.com/apache/spark/pull/36963
### What changes were proposed in this pull request? This PR proposes to expose the information of catalog table (V1/V2) to the logical plan in streaming query, specifically, parsed plan and analyzed plan. (We may discard some information in optimized plan.) The major change is to propagate the information of catalog table from the place we resolve the table to the place we execute the query. In MicroBatch execution, we have several transformations on the logical plan which replace the node with another node, hence this PR touches multiple logical nodes which the code path passes through. Specifically for DSv1 sink, we don't have a specific write logical node, hence it's not feasible to expose the information for the destination. This PR introduces `WriteToMicroBatchDataSourceV1` which is DSv1 version of `WriteToMicroBatchDataSource` as a logical node for DSv1 sink. Worth noting that `WriteToMicroBatchDataSourceV1` plays as a marker - we eliminate this node in streaming specific optimization phase. ### Why are the changes needed? This PR give a better UX to end users who use table API for streaming query. Previously it's not easy or even not feasible to check which tables are being read and written from the streaming query. Most likely it requires end users to look into their code/query. ### Does this PR introduce _any_ user-facing change? Yes, in parsed/analyzed plan, we now expose the table information into the read/write logical node. Specifically for DSv1, we introduce a marker write node to expose the information for destination without majorly changing existing logic. > DSv1 read and write >> Before the patch <img width="1635" alt="original-read-from-dsv1-write-to-dsv1" src="https://user-images.githubusercontent.com/1317309/175210731-dcc4cc4d-a70b-467d-b577-79c20600db32.png"> >> After the patch <img width="1716" alt="proposal-read-from-dsv1-write-to-dsv1" src="https://user-images.githubusercontent.com/1317309/175210753-8f3c0b81-6ec5-45df-a2f8-37589960bba2.png"> > DSv2 read and write >> Before the patch <img width="1684" alt="original-read-from-dsv2-write-to-dsv2" src="https://user-images.githubusercontent.com/1317309/175210780-4a99c670-8a42-4511-959c-cafe0c24bc00.png"> >> After the patch <img width="1755" alt="proposal-read-from-dsv2-write-to-dsv2" src="https://user-images.githubusercontent.com/1317309/175210807-2b0b6935-e696-4051-b1a2-725d784d9d5f.png"> ### How was this patch tested? New test cases. Also manually tested via running following query and checked the UI page: > DSv1 read and write ``` /* ./bin/spark-shell --conf "spark.sql.ui.explainMode=extended" */ spark.sql("drop table if exists stream_source") spark.sql("drop table if exists stream_target") spark.sql("create table stream_source (col1 string, col2 int) using parquet") spark.sql("create table stream_target (col1 string, col2 int) using parquet") val checkpointDir = java.nio.file.Files.createTempDirectory("checkpoint-") val q = spark.readStream.table("stream_source").writeStream.format("parquet").option("checkpointLocation", checkpointDir.toString).toTable("stream_target") Thread.sleep(10000) spark.sql("insert into stream_source values ('a', 1)") spark.sql("insert into stream_source values ('a', 2)") spark.sql("insert into stream_source values ('a', 3)") q.processAllAvailable() spark.sql("insert into stream_source values ('b', 1)") spark.sql("insert into stream_source values ('b', 2)") spark.sql("insert into stream_source values ('b', 3)") q.processAllAvailable() spark.sql("insert into stream_source values ('c', 1)") spark.sql("insert into stream_source values ('c', 2)") spark.sql("insert into stream_source values ('c', 3)") q.processAllAvailable() q.stop() ``` > DSv2 read and write ``` /* ./bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1\ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.local.type=hadoop \ --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \ --conf spark.sql.ui.explainMode=extended */ spark.sql("drop table if exists local.db.stream_target") spark.sql("create table local.db.stream_source (col1 string, col2 int) using iceberg") spark.sql("create table local.db.stream_target (col1 string, col2 int) using iceberg") val checkpointDir = java.nio.file.Files.createTempDirectory("checkpoint-") val q = spark.readStream.table("local.db.stream_source").writeStream.format("iceberg").option("checkpointLocation", checkpointDir.toString).toTable("local.db.stream_target") Thread.sleep(10000) spark.sql("insert into local.db.stream_source values ('a', 1)") spark.sql("insert into local.db.stream_source values ('a', 2)") spark.sql("insert into local.db.stream_source values ('a', 3)") q.processAllAvailable() spark.sql("insert into local.db.stream_source values ('b', 1)") spark.sql("insert into local.db.stream_source values ('b', 2)") spark.sql("insert into local.db.stream_source values ('b', 3)") q.processAllAvailable() spark.sql("insert into local.db.stream_source values ('c', 1)") spark.sql("insert into local.db.stream_source values ('c', 2)") spark.sql("insert into local.db.stream_source values ('c', 3)") q.processAllAvailable() q.stop() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
