HeartSaVioR opened a new pull request #30835:
URL: https://github.com/apache/spark/pull/30835
### What changes were proposed in this pull request?
This PR proposes to expose DataStreamReader.table (SPARK-32885) and
DataStreamWriter.toTable (SPARK-32896) to PySpark, which are the only way to
read and write with table in Structured Streaming.
### Why are the changes needed?
Please refer SPARK-32885 and SPARK-32896 for rationalizations of these
public APIs. This PR only exposes them to PySpark.
### Does this PR introduce _any_ user-facing change?
Yes, PySpark users will be able to read and write with table in Structured
Streaming query.
### How was this patch tested?
Manually tested.
> v1 table
>> create table A and ingest to the table A
```
spark.sql("""
create table table_pyspark_parquet (
value long,
`timestamp` timestamp
) USING parquet
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.writeStream.toTable('table_pyspark_parquet',
checkpointLocation='/tmp/checkpoint5')
query.lastProgress
query.stop()
```
>> read table A and ingest to the table B which doesn't exist
```
df2 = spark.readStream.table('table_pyspark_parquet')
query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist',
checkpointLocation='/tmp/checkpoint2')
query2.lastProgress
query2.stop()
```
>> select tables
```
spark.sql("SELECT * FROM table_pyspark_parquet").show()
spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show()
```
> v2 table (leveraging Apache Iceberg as it provides V2 table and custom
catalog as well)
>> create table A and ingest to the table A
```
spark.sql("""
create table iceberg_catalog.default.table_pyspark_v2table (
value long,
`timestamp` timestamp
) USING iceberg
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.select('value',
'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table',
checkpointLocation='/tmp/checkpoint_v2table_1')
query.lastProgress
query.stop()
```
>> ingest to the non-exist table B
```
df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query2 = df.select('value',
'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist',
checkpointLocation='/tmp/checkpoint_v2table_2')
query2.lastProgress
query2.stop()
```
>> ingest to the non-exist table C partitioned by `value % 10`
```
df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query3 = df.selectExpr('value', 'timestamp', 'value % 10 AS
partition').repartition('partition').writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned',
checkpointLocation='/tmp/checkpoint_v2table_4')
query3.lastProgress
query3.stop()
```
>> select tables
```
spark.sql("DESCRIBE TABLE
iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("SELECT * FROM
iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("DESCRIBE TABLE
iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("SELECT * FROM
iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("DESCRIBE TABLE
iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
spark.sql("SELECT * FROM
iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]