amaliujia commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051069651
##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
melt = unpivot
+ def observe(
+ self,
+ observation: Union["Observation", str],
+ *exprs: Column,
+ ) -> "DataFrame":
+ """Define (named) metrics to observe on the DataFrame. This method
returns an 'observed'
+ DataFrame that returns the same result as the input, with the
following guarantees:
+
+ * It will compute the defined aggregates (metrics) on all the data
that is flowing through
+ the Dataset at that point.
+
+ * It will report the value of the defined aggregate columns as soon as
we reach a completion
+ point. A completion point is either the end of a query (batch
mode) or the end of a
+ streaming epoch. The value of the aggregates only reflects the
data processed since
+ the previous completion point.
+
+ The metrics columns must either contain a literal (e.g. lit(42)), or
should contain one or
+ more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) -
lit(1)). Expressions that
+ contain references to the input Dataset's columns must always be
wrapped in an aggregate
+ function.
+
+ A user can observe these metrics by adding
+ Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+ Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener``
or Scala/Java's
+ ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark
session.
+
+ .. versionadded:: 3.3.0
Review Comment:
3.4.0?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]