maropu commented on a change in pull request #30894:
URL: https://github.com/apache/spark/pull/30894#discussion_r551687438
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
##########
@@ -175,20 +175,58 @@ abstract class UnaryNode extends LogicalPlan {
*/
protected def getAllValidConstraints(projectList: Seq[NamedExpression]):
ExpressionSet = {
var allConstraints = child.constraints
- projectList.foreach {
- case a @ Alias(l: Literal, _) =>
- allConstraints += EqualNullSafe(a.toAttribute, l)
- case a @ Alias(e, _) =>
- // For every alias in `projectList`, replace the reference in
constraints by its attribute.
- allConstraints ++= allConstraints.map(_ transform {
- case expr: Expression if expr.semanticEquals(e) =>
- a.toAttribute
+
+ // For each expression collect its aliases
+ val aliasMap = projectList.collect{
Review comment:
nit: `collect{` -> `collect {`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
##########
@@ -175,20 +175,58 @@ abstract class UnaryNode extends LogicalPlan {
*/
protected def getAllValidConstraints(projectList: Seq[NamedExpression]):
ExpressionSet = {
var allConstraints = child.constraints
- projectList.foreach {
- case a @ Alias(l: Literal, _) =>
- allConstraints += EqualNullSafe(a.toAttribute, l)
- case a @ Alias(e, _) =>
- // For every alias in `projectList`, replace the reference in
constraints by its attribute.
- allConstraints ++= allConstraints.map(_ transform {
- case expr: Expression if expr.semanticEquals(e) =>
- a.toAttribute
+
+ // For each expression collect its aliases
+ val aliasMap = projectList.collect{
+ case alias @ Alias(expr, _) if !expr.foldable && expr.deterministic =>
+ (expr.canonicalized, alias)
+ }.groupBy(_._1).mapValues(_.map(_._2))
+ val remainingExpressions = collection.mutable.Set(aliasMap.keySet.toSeq:
_*)
+
+ /**
+ * Filtering allConstraints between each iteration is necessary, because
+ * otherwise collecting valid constraints could in the worst case have
exponential
+ * time and memory complexity. Each replaced alias could double the number
of constraints,
+ * because we would keep both the original constraint and the one with
alias.
+ */
+ def shouldBeKept(expr: Expression): Boolean = {
+ expr.references.subsetOf(outputSet) ||
+ remainingExpressions.contains(expr.canonicalized) ||
+ (expr.children.nonEmpty && expr.children.forall(shouldBeKept))
+ }
+
+ // Replace expressions with aliases
+ for ((expr, aliases) <- aliasMap) {
+ allConstraints ++= allConstraints.flatMap(constraint => {
+ aliases.map(alias => {
+ constraint transform {
+ case e: Expression if e.semanticEquals(expr) =>
+ alias.toAttribute
+ }
})
- allConstraints += EqualNullSafe(e, a.toAttribute)
+ })
+
+ for { alias1 <- aliases; alias2 <- aliases } {
+ if (!alias1.fastEquals(alias2)) {
+ allConstraints += EqualNullSafe(alias1.toAttribute,
alias2.toAttribute)
Review comment:
Could we pull out this code from the outer loop "`for ((expr, aliases)
<- aliasMap) {`"? It seems we don't need to add these constraints into
`allConstraints` in the loop.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
##########
@@ -175,20 +175,58 @@ abstract class UnaryNode extends LogicalPlan {
*/
protected def getAllValidConstraints(projectList: Seq[NamedExpression]):
ExpressionSet = {
var allConstraints = child.constraints
- projectList.foreach {
- case a @ Alias(l: Literal, _) =>
- allConstraints += EqualNullSafe(a.toAttribute, l)
- case a @ Alias(e, _) =>
- // For every alias in `projectList`, replace the reference in
constraints by its attribute.
- allConstraints ++= allConstraints.map(_ transform {
- case expr: Expression if expr.semanticEquals(e) =>
- a.toAttribute
+
+ // For each expression collect its aliases
+ val aliasMap = projectList.collect{
+ case alias @ Alias(expr, _) if !expr.foldable && expr.deterministic =>
+ (expr.canonicalized, alias)
+ }.groupBy(_._1).mapValues(_.map(_._2))
+ val remainingExpressions = collection.mutable.Set(aliasMap.keySet.toSeq:
_*)
+
+ /**
+ * Filtering allConstraints between each iteration is necessary, because
+ * otherwise collecting valid constraints could in the worst case have
exponential
+ * time and memory complexity. Each replaced alias could double the number
of constraints,
+ * because we would keep both the original constraint and the one with
alias.
+ */
+ def shouldBeKept(expr: Expression): Boolean = {
+ expr.references.subsetOf(outputSet) ||
+ remainingExpressions.contains(expr.canonicalized) ||
+ (expr.children.nonEmpty && expr.children.forall(shouldBeKept))
+ }
+
+ // Replace expressions with aliases
+ for ((expr, aliases) <- aliasMap) {
+ allConstraints ++= allConstraints.flatMap(constraint => {
+ aliases.map(alias => {
+ constraint transform {
+ case e: Expression if e.semanticEquals(expr) =>
+ alias.toAttribute
+ }
})
- allConstraints += EqualNullSafe(e, a.toAttribute)
+ })
+
+ for { alias1 <- aliases; alias2 <- aliases } {
+ if (!alias1.fastEquals(alias2)) {
+ allConstraints += EqualNullSafe(alias1.toAttribute,
alias2.toAttribute)
+ }
+ }
+
+ remainingExpressions.remove(expr)
+ allConstraints = allConstraints.filter(shouldBeKept)
+ }
+
+ /**
+ We keep the child constraints and equality between original and aliased
attributes,
+ so [[ConstraintHelper.inferAdditionalConstraints]] would have the full
information available.
+ */
Review comment:
nit format:
```
/**
* We keep the child constraints and equality between original and
aliased attributes,
* so [[ConstraintHelper.inferAdditionalConstraints]] would have the
full information available.
*/
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
##########
@@ -175,20 +175,58 @@ abstract class UnaryNode extends LogicalPlan {
*/
protected def getAllValidConstraints(projectList: Seq[NamedExpression]):
ExpressionSet = {
var allConstraints = child.constraints
- projectList.foreach {
- case a @ Alias(l: Literal, _) =>
- allConstraints += EqualNullSafe(a.toAttribute, l)
- case a @ Alias(e, _) =>
- // For every alias in `projectList`, replace the reference in
constraints by its attribute.
- allConstraints ++= allConstraints.map(_ transform {
- case expr: Expression if expr.semanticEquals(e) =>
- a.toAttribute
+
+ // For each expression collect its aliases
+ val aliasMap = projectList.collect{
+ case alias @ Alias(expr, _) if !expr.foldable && expr.deterministic =>
+ (expr.canonicalized, alias)
+ }.groupBy(_._1).mapValues(_.map(_._2))
+ val remainingExpressions = collection.mutable.Set(aliasMap.keySet.toSeq:
_*)
+
+ /**
+ * Filtering allConstraints between each iteration is necessary, because
+ * otherwise collecting valid constraints could in the worst case have
exponential
+ * time and memory complexity. Each replaced alias could double the number
of constraints,
+ * because we would keep both the original constraint and the one with
alias.
+ */
+ def shouldBeKept(expr: Expression): Boolean = {
+ expr.references.subsetOf(outputSet) ||
+ remainingExpressions.contains(expr.canonicalized) ||
+ (expr.children.nonEmpty && expr.children.forall(shouldBeKept))
+ }
+
+ // Replace expressions with aliases
+ for ((expr, aliases) <- aliasMap) {
+ allConstraints ++= allConstraints.flatMap(constraint => {
+ aliases.map(alias => {
+ constraint transform {
+ case e: Expression if e.semanticEquals(expr) =>
+ alias.toAttribute
+ }
})
- allConstraints += EqualNullSafe(e, a.toAttribute)
+ })
+
+ for { alias1 <- aliases; alias2 <- aliases } {
Review comment:
We cannot rewrite it using `combinations` like this?
```
aliases.combinations(2).map {
case Seq(a1, a2) => EqualNullSafe(a1.toAttribute, a2.toAttribute)
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]