maropu commented on a change in pull request #30300:
URL: https://github.com/apache/spark/pull/30300#discussion_r520213847
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -48,13 +42,13 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
*/
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
- if (hasAlias) {
- child.outputPartitioning match {
- case h: HashPartitioning => h.copy(expressions =
replaceAliases(h.expressions))
Review comment:
If we generalize this logic, I think its better to add tests for all the
partitioning cases where possible.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -48,13 +42,13 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
*/
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
- if (hasAlias) {
- child.outputPartitioning match {
- case h: HashPartitioning => h.copy(expressions =
replaceAliases(h.expressions))
- case other => other
- }
- } else {
- child.outputPartitioning
+ child.outputPartitioning match {
+ case e: Expression if hasAlias =>
+ val normalizedExp = e.transformDown {
Review comment:
nit: `normalizedExp` -> `normalizedExpr` and `transformDown` ->
`transform`
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -895,6 +895,73 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
}
}
+ test("No extra exchanges in case of [Inner Join -> Project with aliases ->
Inner join]") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
Review comment:
Why we need to turn off the constant propagation?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -25,20 +25,14 @@ import
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]
- protected def hasAlias: Boolean = outputExpressions.collectFirst { case _:
Alias => }.isDefined
+ lazy val aliasMap = AttributeMap(outputExpressions.collect {
+ case a @ Alias(child: AttributeReference, _) => (child, a.toAttribute)
+ })
- protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = {
- exprs.map {
- case a: AttributeReference => replaceAlias(a).getOrElse(a)
- case other => other
- }
- }
+ protected def hasAlias: Boolean = aliasMap.nonEmpty
protected def replaceAlias(attr: AttributeReference): Option[Attribute] = {
Review comment:
Looks `AliasHelper` has the similar func and could you check if we can
reuse it?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -25,20 +25,14 @@ import
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]
- protected def hasAlias: Boolean = outputExpressions.collectFirst { case _:
Alias => }.isDefined
+ lazy val aliasMap = AttributeMap(outputExpressions.collect {
Review comment:
`private`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -25,20 +25,14 @@ import
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]
- protected def hasAlias: Boolean = outputExpressions.collectFirst { case _:
Alias => }.isDefined
+ lazy val aliasMap = AttributeMap(outputExpressions.collect {
Review comment:
Also, could you use `AliasHelper`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -48,13 +42,13 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
*/
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
- if (hasAlias) {
- child.outputPartitioning match {
- case h: HashPartitioning => h.copy(expressions =
replaceAliases(h.expressions))
- case other => other
- }
- } else {
- child.outputPartitioning
+ child.outputPartitioning match {
+ case e: Expression if hasAlias =>
+ val normalizedExp = e.transformDown {
Review comment:
btw, we need `transform` here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]