zhengruifeng commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051733557


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method 
returns an 'observed'
+        DataFrame that returns the same result as the input, with the 
following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data 
that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as 
we reach a completion
+            point. A completion point is either the end of a query (batch 
mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the 
data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or 
should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - 
lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be 
wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` 
or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark 
session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to 
obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.

Review Comment:
   ```suggestion
   
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method 
returns an 'observed'
+        DataFrame that returns the same result as the input, with the 
following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data 
that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as 
we reach a completion
+            point. A completion point is either the end of a query (batch 
mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the 
data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or 
should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - 
lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be 
wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` 
or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark 
session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to 
obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.
+        exprs : :class:`Column`
+            column expressions (:class:`Column`).
+
+        Returns
+        -------
+        :class:`DataFrame`
+            the observed :class:`DataFrame`.
+
+        Notes
+        -----
+        When ``observation`` is :class:`Observation`, this method only 
supports batch queries.
+        When ``observation`` is a string, this method works for both batch and 
streaming queries.
+        Continuous execution is currently not supported yet.
+        """
+        from pyspark.sql import Observation
+
+        if len(exprs) == 0:
+            raise ValueError("'exprs' should not be empty")
+        if not all(isinstance(c, Column) for c in exprs):
+            raise ValueError("all 'exprs' should be Column")
+
+        if isinstance(observation, Observation):
+            return DataFrame.withPlan(
+                plan.CollectMetrics(self._plan, str(observation._name), 
list(exprs), True),

Review Comment:
   is the implementation in pyspark equivalent to this? 
https://github.com/apache/spark/blob/c014fa2e18713f67deb072a4336286f4a3f4d3f4/python/pyspark/sql/observation.py#L86-L110



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method 
returns an 'observed'
+        DataFrame that returns the same result as the input, with the 
following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data 
that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as 
we reach a completion
+            point. A completion point is either the end of a query (batch 
mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the 
data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or 
should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - 
lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be 
wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` 
or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark 
session.
+
+        .. versionadded:: 3.3.0

Review Comment:
   ```suggestion
           .. versionadded:: 3.4.0
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method 
returns an 'observed'
+        DataFrame that returns the same result as the input, with the 
following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data 
that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as 
we reach a completion
+            point. A completion point is either the end of a query (batch 
mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the 
data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or 
should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - 
lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be 
wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` 
or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark 
session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to 
obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.
+        exprs : :class:`Column`
+            column expressions (:class:`Column`).
+
+        Returns
+        -------
+        :class:`DataFrame`
+            the observed :class:`DataFrame`.
+
+        Notes
+        -----
+        When ``observation`` is :class:`Observation`, this method only 
supports batch queries.
+        When ``observation`` is a string, this method works for both batch and 
streaming queries.

Review Comment:
   `streaming queries` is out of scope now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to