grundprinzip commented on code in PR #39925:
URL: https://github.com/apache/spark/pull/39925#discussion_r1100241635
##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -91,8 +91,11 @@ message Unknown {}
// Common metadata of all relations.
message RelationCommon {
+ // (Required) A globally unique id for a given connect plan.
+ int64 plan_id = 1;
+
// (Required) Shared relation metadata.
- string source_info = 1;
+ string source_info = 2;
Review Comment:
This is a breaking change please do not do this. Simply make the `plan_id` a
2, it does not matter.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -123,6 +123,11 @@ class SparkConnectPlanner(val session: SparkSession) {
transformRelationPlugin(rel.getExtension)
case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
}
+
+ if (rel.hasCommon) {
+ plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId)
Review Comment:
if planId is not set it will be set to 0, is that expected?
##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -91,8 +91,11 @@ message Unknown {}
// Common metadata of all relations.
message RelationCommon {
+ // (Required) A globally unique id for a given connect plan.
+ int64 plan_id = 1;
Review Comment:
```suggestion
optional int64 plan_id = 2;
```
##########
python/pyspark/sql/connect/plan.py:
##########
@@ -673,32 +716,34 @@ def plan(self, session: "SparkConnectClient") ->
proto.Relation:
from pyspark.sql.connect.functions import lit
assert self._child is not None
+ plan = proto.Relation()
+ plan.common.plan_id = self._plan_id
- agg = proto.Relation()
-
- agg.aggregate.input.CopyFrom(self._child.plan(session))
+ plan.aggregate.input.CopyFrom(self._child.plan(session))
- agg.aggregate.grouping_expressions.extend([c.to_plan(session) for c in
self._grouping_cols])
- agg.aggregate.aggregate_expressions.extend(
+ plan.aggregate.grouping_expressions.extend(
+ [c.to_plan(session) for c in self._grouping_cols]
+ )
+ plan.aggregate.aggregate_expressions.extend(
[c.to_plan(session) for c in self._aggregate_cols]
)
if self._group_type == "groupby":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY
elif self._group_type == "rollup":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP
elif self._group_type == "cube":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_CUBE
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_CUBE
elif self._group_type == "pivot":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_PIVOT
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_PIVOT
assert self._pivot_col is not None
- agg.aggregate.pivot.col.CopyFrom(self._pivot_col.to_plan(session))
+ plan.aggregate.pivot.col.CopyFrom(self._pivot_col.to_plan(session))
if self._pivot_values is not None and len(self._pivot_values) > 0:
- agg.aggregate.pivot.values.extend(
+ plan.aggregate.pivot.values.extend(
[lit(v).to_plan(session).literal for v in
self._pivot_values]
)
- return agg
+ return plan
Review Comment:
here and below?
##########
python/pyspark/sql/connect/plan.py:
##########
@@ -259,6 +272,8 @@ def __init__(
def plan(self, session: "SparkConnectClient") -> proto.Relation:
plan = proto.Relation()
Review Comment:
can you just add a private helper method?
```
def _create_relation(self) -> proto.Relation:
rel = proto.Relation()
rel.common.plan_id = self._plan_id
return rel
```
and then use this everywhere? It makes it easier later as well to add the
source info as well. In addition it reduces the risk that a new plan node will
miss adding the ID.
##########
python/pyspark/sql/connect/plan.py:
##########
@@ -673,32 +716,34 @@ def plan(self, session: "SparkConnectClient") ->
proto.Relation:
from pyspark.sql.connect.functions import lit
assert self._child is not None
+ plan = proto.Relation()
+ plan.common.plan_id = self._plan_id
- agg = proto.Relation()
-
- agg.aggregate.input.CopyFrom(self._child.plan(session))
+ plan.aggregate.input.CopyFrom(self._child.plan(session))
- agg.aggregate.grouping_expressions.extend([c.to_plan(session) for c in
self._grouping_cols])
- agg.aggregate.aggregate_expressions.extend(
+ plan.aggregate.grouping_expressions.extend(
+ [c.to_plan(session) for c in self._grouping_cols]
+ )
+ plan.aggregate.aggregate_expressions.extend(
[c.to_plan(session) for c in self._aggregate_cols]
)
if self._group_type == "groupby":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY
elif self._group_type == "rollup":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP
elif self._group_type == "cube":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_CUBE
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_CUBE
elif self._group_type == "pivot":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_PIVOT
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_PIVOT
assert self._pivot_col is not None
- agg.aggregate.pivot.col.CopyFrom(self._pivot_col.to_plan(session))
+ plan.aggregate.pivot.col.CopyFrom(self._pivot_col.to_plan(session))
if self._pivot_values is not None and len(self._pivot_values) > 0:
- agg.aggregate.pivot.values.extend(
+ plan.aggregate.pivot.values.extend(
[lit(v).to_plan(session).literal for v in
self._pivot_values]
)
- return agg
+ return plan
Review Comment:
why this refactoring in this pr?
##########
python/pyspark/sql/connect/plan.py:
##########
@@ -673,32 +716,34 @@ def plan(self, session: "SparkConnectClient") ->
proto.Relation:
from pyspark.sql.connect.functions import lit
assert self._child is not None
+ plan = proto.Relation()
+ plan.common.plan_id = self._plan_id
- agg = proto.Relation()
-
- agg.aggregate.input.CopyFrom(self._child.plan(session))
+ plan.aggregate.input.CopyFrom(self._child.plan(session))
- agg.aggregate.grouping_expressions.extend([c.to_plan(session) for c in
self._grouping_cols])
- agg.aggregate.aggregate_expressions.extend(
+ plan.aggregate.grouping_expressions.extend(
+ [c.to_plan(session) for c in self._grouping_cols]
+ )
+ plan.aggregate.aggregate_expressions.extend(
[c.to_plan(session) for c in self._aggregate_cols]
)
if self._group_type == "groupby":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY
elif self._group_type == "rollup":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP
elif self._group_type == "cube":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_CUBE
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_CUBE
elif self._group_type == "pivot":
- agg.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_PIVOT
+ plan.aggregate.group_type =
proto.Aggregate.GroupType.GROUP_TYPE_PIVOT
assert self._pivot_col is not None
- agg.aggregate.pivot.col.CopyFrom(self._pivot_col.to_plan(session))
+ plan.aggregate.pivot.col.CopyFrom(self._pivot_col.to_plan(session))
if self._pivot_values is not None and len(self._pivot_values) > 0:
- agg.aggregate.pivot.values.extend(
+ plan.aggregate.pivot.values.extend(
[lit(v).to_plan(session).literal for v in
self._pivot_values]
)
- return agg
+ return plan
Review Comment:
Another question is why `plan` is a better var name compared to `rel`?
Because it's a Relation not a Plan?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]