HyukjinKwon opened a new pull request, #36082:
URL: https://github.com/apache/spark/pull/36082
### What changes were proposed in this pull request?
This PR adds the support of Structured Streaming at `DataFrame.observe` in
PySpark. This is the same support with the Scala side.
### Why are the changes needed?
For SS users in PySpark to easily monitor their streaming queries.
### Does this PR introduce _any_ user-facing change?
Yes. After this PR, PySpark users can do:
```python
from pyspark.sql.functions import count, col, sum, lit
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
# Do something with the metric, e.g.) row.cnt and row.sum
print("Count: %s" % row.cnt)
def onQueryTerminated(self, event):
pass
spark.streams.addListener(MyListener())
df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
observed_df = df.observe("metric", count(lit(1)).alias("cnt"),
sum(col("value")).alias("sum"))
observed_df.writeStream.format("noop").queryName("test").start()
```
### How was this patch tested?
Manually tested with the example above, and unit test was added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]