grundprinzip commented on code in PR #39158:
URL: https://github.com/apache/spark/pull/39158#discussion_r1054812719
##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,14 @@ message ToSchema {
// The Sever side will update the dataframe with this schema.
DataType schema = 2;
}
+
+ message RepartitionByExpression {
+ // (Required) The input relation.
+ Relation input = 1;
+
+ // (Required) The partitioning expressions
+ repeated Expression partition_exprs = 2;
+
+ // (Optional) number of partitions, must be positive.
+ int32 num_partitions = 3;
Review Comment:
```suggestion
optional int32 num_partitions = 3;
```
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
+ private def transformRepartitionByExpression(
+ rel: proto.RepartitionByExpression): LogicalPlan = {
+ val numPartitionsOpt = if (rel.getNumPartitions == 0) {
+ None
+ } else {
+ Some(rel.getNumPartitions)
+ }
+ val partitionExpressions = rel.getPartitionExprsList.asScala.map { expr =>
+ transformExpression(expr)
+ }.toSeq
Review Comment:
```suggestion
val partitionExpressions =
rel.getPartitionExprsList.asScala.map(transformExpression).toSeq
```
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
+ private def transformRepartitionByExpression(
+ rel: proto.RepartitionByExpression): LogicalPlan = {
+ val numPartitionsOpt = if (rel.getNumPartitions == 0) {
+ None
Review Comment:
please use the actual value in the if branch and `None` in the else
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -705,4 +705,86 @@ class SparkConnectPlannerSuite extends SparkFunSuite with
SparkConnectPlanTest {
}
}
}
+
+ test("RepartitionByExpression") {
Review Comment:
please add a test that would throw an exception during the execution
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
+ private def transformRepartitionByExpression(
+ rel: proto.RepartitionByExpression): LogicalPlan = {
+ val numPartitionsOpt = if (rel.getNumPartitions == 0) {
Review Comment:
with the optional change you can actually use `if (rel.hasNumPartitions) {`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]