HyukjinKwon opened a new pull request, #36082:
URL: https://github.com/apache/spark/pull/36082

   ### What changes were proposed in this pull request?
   
   This PR adds the support of Structured Streaming at `DataFrame.observe` in 
PySpark. This is the same support with the Scala side.
   
   ### Why are the changes needed?
   
   For SS users in PySpark to easily monitor their streaming queries.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. After this PR, PySpark users can do:
   
   ```python
   from pyspark.sql.functions import count, col, sum, lit
   from pyspark.sql.streaming import StreamingQueryListener
   
   class MyListener(StreamingQueryListener):
       def onQueryStarted(self, event):
           pass
       def onQueryProgress(self, event):
           row = event.progress.observedMetrics.get("metric")
           # Do something with the metric, e.g.) row.cnt and row.sum
           print("Count: %s" % row.cnt)
       def onQueryTerminated(self, event):
           pass
   
   spark.streams.addListener(MyListener())
   
   df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
   observed_df = df.observe("metric", count(lit(1)).alias("cnt"), 
sum(col("value")).alias("sum"))
   observed_df.writeStream.format("noop").queryName("test").start()
   ```
   
   ### How was this patch tested?
   
   Manually tested with the example above, and unit test was added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to