fqaiser94 commented on a change in pull request #29812:
URL: https://github.com/apache/spark/pull/29812#discussion_r491929282
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
+ val newNames = mutable.ArrayBuffer.empty[String]
+ val newValues = mutable.ArrayBuffer.empty[Expression]
+ names.zip(values).reverse.foreach { case (name, value) =>
+ if (!newNames.contains(name)) {
Review comment:
should use `resolver` here otherwise I think we will have correct-ness
issues.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
+ val newNames = mutable.ArrayBuffer.empty[String]
+ val newValues = mutable.ArrayBuffer.empty[Expression]
+ names.zip(values).reverse.foreach { case (name, value) =>
+ if (!newNames.contains(name)) {
+ newNames += name
+ newValues += value
+ }
+ }
+ WithFields(structExpr, names = newNames.reverse.toSeq, valExprs =
newValues.reverse.toSeq)
Review comment:
For my understanding, can you explain how we expect to benefit from this
optimization?
I ask because we do this kind of deduplication inside of `WithFields`
already as part of the `foldLeft` operation
[here](https://github.com/apache/spark/blob/d01594e8d186e63a6c3ce361e756565e830d5237/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L578).
It will only keep the last `valExpr` for each `name`. So I think the optimized
logical plan will be the same with or without this optimization in all
scenarios? CMIIW
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
Review comment:
could this `case` statement be after the next `case` statement? So that
we combine the chains first before deduplicating?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
+ val newNames = mutable.ArrayBuffer.empty[String]
+ val newValues = mutable.ArrayBuffer.empty[Expression]
+ names.zip(values).reverse.foreach { case (name, value) =>
+ if (!newNames.contains(name)) {
+ newNames += name
+ newValues += value
+ }
+ }
+ WithFields(structExpr, names = newNames.reverse.toSeq, valExprs =
newValues.reverse.toSeq)
Review comment:
Okay, so I took a look at the PR you linked and left a related
[comment](https://github.com/apache/spark/pull/29587/files#r493098043) there. I
don't think you actually need this optimization for `unionByName`.
This optimization is only useful if someone uses `WithFields` to update the
same field multiple times. However, it would simply be better to **not** update
the same field multiple times. At the very least, we should not do this when we
re-use this Expression internally within Spark.
Unfortunately, "bad" end-users might still update the same field multiple
times. Assuming we should optimize for such users (not sure), since this batch
is only applied half-way through the optimization cycle, I think we could just
move up the `Batch("ReplaceWithFieldsExpression", Once,
ReplaceWithFieldsExpression)` to get the same benefit (which is just simplified
tree). What do you reckon?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
+ val newNames = mutable.ArrayBuffer.empty[String]
+ val newValues = mutable.ArrayBuffer.empty[Expression]
+ names.zip(values).reverse.foreach { case (name, value) =>
+ if (!newNames.contains(name)) {
+ newNames += name
+ newValues += value
+ }
+ }
+ WithFields(structExpr, names = newNames.reverse.toSeq, valExprs =
newValues.reverse.toSeq)
Review comment:
Okay, so I took a look at the PR you linked and left a related
[comment](https://github.com/apache/spark/pull/29587/files#r493098043) there. I
don't think you actually need this optimization for `unionByName`.
This optimization is only useful if someone uses `WithFields` to update the
same field multiple times. However, it would simply be better to **not** update
the same field multiple times. At the very least, we should not do this when we
re-use this Expression internally within Spark.
Unfortunately, "bad" end-users might still update the same field multiple
times. Assuming we should optimize for such users (not sure), since this batch
is only applied half-way through the optimization cycle anyway, I think we
could just move up the `Batch("ReplaceWithFieldsExpression", Once,
ReplaceWithFieldsExpression)` to get the same benefit (which is just simplified
tree). What do you reckon?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
+ val newNames = mutable.ArrayBuffer.empty[String]
+ val newValues = mutable.ArrayBuffer.empty[Expression]
+ names.zip(values).reverse.foreach { case (name, value) =>
+ if (!newNames.contains(name)) {
+ newNames += name
+ newValues += value
+ }
+ }
+ WithFields(structExpr, names = newNames.reverse.toSeq, valExprs =
newValues.reverse.toSeq)
Review comment:
Okay, so I took a look at the PR you linked and left a related
[comment](https://github.com/apache/spark/pull/29587/files#r493098043) there. I
don't think you actually need this optimization for #29587
This optimization is only useful if someone uses `WithFields` to update the
same field multiple times. However, it would simply be better to **not** update
the same field multiple times. At the very least, we should not do this when we
re-use this Expression internally within Spark.
Unfortunately, "bad" end-users might still update the same field multiple
times. Assuming we should optimize for such users (not sure), since this batch
is only applied half-way through the optimization cycle anyway, I think we
could just move up the `Batch("ReplaceWithFieldsExpression", Once,
ReplaceWithFieldsExpression)` to get the same benefit (which is just simplified
tree). What do you reckon?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/WithFields.scala
##########
@@ -17,16 +17,29 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.WithFields
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField,
WithFields}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * Combines all adjacent [[WithFields]] expression into a single
[[WithFields]] expression.
+ * Optimizes [[WithFields]] expression chains.
*/
-object CombineWithFields extends Rule[LogicalPlan] {
+object OptimizeWithFields extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case WithFields(structExpr, names, values) if names.distinct.length !=
names.length =>
+ val newNames = mutable.ArrayBuffer.empty[String]
+ val newValues = mutable.ArrayBuffer.empty[Expression]
+ names.zip(values).reverse.foreach { case (name, value) =>
+ if (!newNames.contains(name)) {
+ newNames += name
+ newValues += value
+ }
+ }
+ WithFields(structExpr, names = newNames.reverse.toSeq, valExprs =
newValues.reverse.toSeq)
Review comment:
ahh I see, yes, in the analysis stage this would likely be helpful!
Okay in that case, could this PR wait till #29795 goes in? I'm refactoring
`WithFields` so this optimization would need to change anyway.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]