zhengruifeng commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1061134797
##########
python/pyspark/sql/connect/plan.py:
##########
@@ -924,6 +924,55 @@ def plan(self, session: "SparkConnectClient") ->
proto.Relation:
return plan
+class CollectMetrics(LogicalPlan):
+ """Logical plan object for a CollectMetrics operation."""
+
+ def __init__(
+ self,
+ child: Optional["LogicalPlan"],
+ name: str,
+ exprs: List["ColumnOrName"],
+ ) -> None:
+ super().__init__(child)
+ self._name = name
+ self._exprs = exprs
+
+ def col_to_expr(self, col: "ColumnOrName", session: "SparkConnectClient")
-> proto.Expression:
+ if isinstance(col, Column):
+ return col.to_plan(session)
+ else:
+ return self.unresolved_attr(col)
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ assert self._child is not None
+
+ plan = proto.Relation()
+ plan.collect_metrics.input.CopyFrom(self._child.plan(session))
+ plan.collect_metrics.name = self._name
+ plan.collect_metrics.metrics.extend([self.col_to_expr(x, session) for
x in self._exprs])
+ return plan
+
+ def print(self, indent: int = 0) -> str:
Review Comment:
we don't need `print` and `_repr_html_` now
##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +159,9 @@ message ExecutePlanResponse {
// batch of results and then represent the overall state of the query
execution.
Metrics metrics = 4;
+ // The metrics observed during the execution of the query plan.
+ optional ObservedMetrics observed_metrics = 5;
Review Comment:
I think this should be `repeated`? we may have multi observations in one
plan
```
In [10]: observation1 = Observation("my metrics 1")
In [11]: observation2 = Observation("my metrics 2")
In [12]: observed_df = df.observe(observation1,
count(lit(1)).alias("count"), max(col("age"))).withColumn("xyz",
lit(1)).observe(observation
...: 2, count(lit(1)).alias("count"), max(col("name")))
In [13]: observed_df.count()
Out[13]: 2
In [14]: observation1.get
Out[14]: {'count': 2, 'max(age)': 5}
In [15]: observation2.get
Out[15]: {'count': 2, 'max(name)': 'Bob'}
In [16]: observed_df.explain()
== Physical Plan ==
CollectMetrics my metrics 2, [count(1) AS count#42L, max(name#1) AS
max(name)#44]
+- *(2) Project [age#0L, name#1, 1 AS xyz#37]
+- CollectMetrics my metrics 1, [count(1) AS count#34L, max(age#0L) AS
max(age)#36L]
+- *(1) Scan ExistingRDD[age#0L,name#1]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]