maropu commented on a change in pull request #23783: [SPARK-26854][SQL] Support
ANY/SOME subquery
URL: https://github.com/apache/spark/pull/23783#discussion_r257505210
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
##########
@@ -195,15 +201,112 @@ case class InSubquery(values: Seq[Expression], query:
ListQuery)
TypeUtils.checkForOrderingExpr(value.dataType, s"function $prettyName")
}
}
-
+ // scalastyle:on line.size.limit
override def children: Seq[Expression] = values :+ query
override def nullable: Boolean = children.exists(_.nullable)
override def foldable: Boolean = children.forall(_.foldable)
+}
+
+object PredicateSubquery {
+ /**
+ * Rewrite `NotEqualTo` as `Not(EqualTo)` and reserve others.
+ */
+ def rewritePredicate(cmp: BinaryComparison)
+ : (Expression, Expression) => Expression = cmp match {
+ case _: NotEqualTo =>
+ (left, right) => Not(EqualTo(left, right))
+ case other =>
+ getPredicate(cmp)
+ }
+
+ def getPredicate(cmp: BinaryComparison)
+ : (Expression, Expression) => BinaryComparison = cmp match {
+ case _: EqualTo =>
+ EqualTo
+ case _: EqualNullSafe =>
+ EqualNullSafe
+ case _: NotEqualTo =>
+ NotEqualTo
+ case _: LessThan =>
+ LessThan
+ case _: LessThanOrEqual =>
+ LessThanOrEqual
+ case _: GreaterThan =>
+ GreaterThan
+ case _: GreaterThanOrEqual =>
+ GreaterThanOrEqual
+ }
+
+ private def combineValues(values: Seq[Expression]): Expression = {
+ if (values.length > 1) {
+ CreateNamedStruct(values.zipWithIndex.flatMap {
+ case (v: NamedExpression, _) => Seq(Literal(v.name), v)
+ case (v, idx) => Seq(Literal(s"_$idx"), v)
+ })
+ } else {
+ values.head
+ }
+ }
+
+ def apply (
+ p: PredicateSubquery,
+ values: Seq[Expression],
+ comparison: BinaryComparison,
+ query: ListQuery): PredicateSubquery = p match {
+ case _: InSubquery =>
+ InSubquery(values, query)
+ case _: AnySubquery =>
+ AnySubquery(getPredicate(comparison)(combineValues(values), query))
+ }
+
+ def unapply(p: PredicateSubquery)
+ : Option[(Seq[Expression], BinaryComparison, ListQuery)] = {
+ if (p == null) {
+ None
+ } else {
+ Some((p.values, p.comparison, p.query))
+ }
+ }
+}
+
+/**
+ * Evaluates to `true` if the comparison between `values`
+ * and any row in `query`'s result set returns `true`.
+ */
+case class AnySubquery(values: Seq[Expression], cmp: BinaryComparison, query:
ListQuery)
+ extends PredicateSubquery {
+
+ override val comparison: BinaryComparison = cmp
+
+ override def predicateName: String = "ANY"
+ override def toString: String = s"$value ${comparison.symbol} ANY ($query)"
+ override def sql: String = s"(${value.sql} ${comparison.symbol} ANY
(${query.sql}))"
+}
+
+object AnySubquery {
+ def apply(comparison: BinaryComparison): AnySubquery = {
+ val values: Seq[Expression] = comparison.left match {
+ case c: CreateNamedStruct => c.valExprs
+ case other => Seq(other)
+ }
+ val query: ListQuery = comparison.right.asInstanceOf[ListQuery]
+
+ AnySubquery(values, comparison, query)
+ }
+}
+
+/**
+ * Evaluates to `true` if `values` are returned in `query`'s result set.
+ */
+case class InSubquery(values: Seq[Expression], query: ListQuery) extends
PredicateSubquery {
+
+ override val comparison: BinaryComparison = EqualTo(value, query)
+
+ override def predicateName: String = "IN"
override def toString: String = s"$value IN ($query)"
override def sql: String = s"(${value.sql} IN (${query.sql}))"
}
-
Review comment:
revert this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]