beliefer commented on a change in pull request #35768:
URL: https://github.com/apache/spark/pull/35768#discussion_r830424150
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
##########
@@ -39,48 +45,102 @@ class V2ExpressionBuilder(e: Expression) {
case _ => false
}
- private def generateExpression(expr: Expression): Option[V2Expression] =
expr match {
+ private def generateExpression(
+ expr: Expression, isPredicate: Boolean = false): Option[V2Expression] =
expr match {
+ case Literal(true, BooleanType) => Some(new AlwaysTrue())
+ case Literal(false, BooleanType) => Some(new AlwaysFalse())
case Literal(value, dataType) => Some(LiteralValue(value, dataType))
- case attr: Attribute => Some(FieldReference.column(attr.name))
+ case col @ pushableColumn(name) if nestedPredicatePushdownEnabled =>
+ if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
+ Some(new V2Predicate("=", Array(FieldReference(name),
LiteralValue(true, BooleanType))))
+ } else {
+ Some(FieldReference(name))
+ }
+ case pushableColumn(name) if !nestedPredicatePushdownEnabled =>
+ Some(FieldReference.column(name))
+ case in @ InSet(child, hset) =>
+ generateExpression(child).map { v =>
+ val children =
+ (v +: hset.toSeq.map(elem => LiteralValue(elem,
in.dataType))).toArray[V2Expression]
+ new V2Predicate("IN", children)
+ }
+ // Because we only convert In to InSet in Optimizer when there are more
than certain
+ // items. So it is possible we still get an In expression here that needs
to be pushed
+ // down.
+ case In(value, list) =>
+ val v = generateExpression(value)
+ val listExpressions = list.flatMap(generateExpression(_))
+ if (v.isDefined && list.length == listExpressions.length) {
+ val children = (v.get +: listExpressions).toArray[V2Expression]
+ // The children looks like [expr, value1, ..., valueN]
+ Some(new V2Predicate("IN", children))
+ } else {
+ None
+ }
case IsNull(col) => generateExpression(col)
- .map(c => new GeneralScalarExpression("IS_NULL", Array[V2Expression](c)))
+ .map(c => new V2Predicate("IS_NULL", Array[V2Expression](c)))
case IsNotNull(col) => generateExpression(col)
- .map(c => new GeneralScalarExpression("IS_NOT_NULL",
Array[V2Expression](c)))
+ .map(c => new V2Predicate("IS_NOT_NULL", Array[V2Expression](c)))
+ case p: StringPredicate =>
+ val left = generateExpression(p.left)
+ val right = generateExpression(p.right)
+ if (left.isDefined && right.isDefined) {
+ val name = p match {
+ case _: StartsWith => "STARTS_WITH"
+ case _: EndsWith => "ENDS_WITH"
+ case _: Contains => "CONTAINS"
+ }
+ Some(new V2Predicate(name, Array[V2Expression](left.get, right.get)))
+ } else {
+ None
+ }
case b: BinaryOperator if canTranslate(b) =>
- val left = generateExpression(b.left)
- val right = generateExpression(b.right)
+ // AND/OR expect predicate
+ val left = generateExpression(b.left, b.isInstanceOf[And] ||
b.isInstanceOf[Or])
+ val right = generateExpression(b.right, b.isInstanceOf[And] ||
b.isInstanceOf[Or])
if (left.isDefined && right.isDefined) {
- Some(new GeneralScalarExpression(b.sqlOperator,
Array[V2Expression](left.get, right.get)))
+ if (b.isInstanceOf[Predicate]) {
+ Some(new V2Predicate(b.sqlOperator, Array[V2Expression](left.get,
right.get)))
+ } else {
+ Some(new GeneralScalarExpression(b.sqlOperator,
Array[V2Expression](left.get, right.get)))
+ }
} else {
None
}
case Not(eq: EqualTo) =>
val left = generateExpression(eq.left)
val right = generateExpression(eq.right)
if (left.isDefined && right.isDefined) {
- Some(new GeneralScalarExpression("!=", Array[V2Expression](left.get,
right.get)))
+ Some(new V2Predicate("<>", Array[V2Expression](left.get, right.get)))
} else {
None
}
- case Not(child) => generateExpression(child)
- .map(v => new GeneralScalarExpression("NOT", Array[V2Expression](v)))
+ case Not(child) => generateExpression(child, true) // NOT expects predicate
+ .map(v => new V2Predicate("NOT", Array[V2Expression](v)))
case UnaryMinus(child, true) => generateExpression(child)
.map(v => new GeneralScalarExpression("-", Array[V2Expression](v)))
case BitwiseNot(child) => generateExpression(child)
.map(v => new GeneralScalarExpression("~", Array[V2Expression](v)))
case CaseWhen(branches, elseValue) =>
- val conditions = branches.map(_._1).flatMap(generateExpression)
- val values = branches.map(_._2).flatMap(generateExpression)
+ val conditions = branches.map(_._1).flatMap(generateExpression(_, true))
+ val values = branches.map(_._2).flatMap(generateExpression(_, true))
if (conditions.length == branches.length && values.length ==
branches.length) {
val branchExpressions = conditions.zip(values).flatMap { case (c, v) =>
Seq[V2Expression](c, v)
}
if (elseValue.isDefined) {
- elseValue.flatMap(generateExpression).map { v =>
+ elseValue.flatMap(generateExpression(_)).map { v =>
val children = (branchExpressions :+ v).toArray[V2Expression]
// The children looks like [condition1, value1, ..., conditionN,
valueN, elseValue]
- new GeneralScalarExpression("CASE_WHEN", children)
+ if (isPredicate) {
+ new V2Predicate("CASE_WHEN", children)
Review comment:
OK
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]