IvanVergiliev commented on a change in pull request #24068: [SPARK-27105][SQL]
Optimize away exponential complexity in ORC predicate conversion
URL: https://github.com/apache/spark/pull/24068#discussion_r289363659
##########
File path:
sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
##########
@@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase {
case _ => value
}
+ import org.apache.spark.sql.sources._
+ import OrcFilters._
+
+ private[sql] def trimUnconvertibleFilters(expression: Filter):
Option[Filter] = {
+ performFilter(expression, canPartialPushDownConjuncts = true)
+ }
+
/**
- * Build a SearchArgument and return the builder so far.
+ * Builds a SearchArgument for a Filter by first trimming the
non-convertible nodes, and then
+ * only building the remaining convertible nodes.
+ *
+ * Doing the conversion in this way avoids the computational complexity
problems introduced by
+ * checking whether a node is convertible while building it. The approach
implemented here has
+ * complexity that's linear in the size of the Filter tree - O(number of
Filter nodes) - we run
+ * a single pass over the tree to trim it, and then another pass on the
trimmed tree to convert
+ * the remaining nodes.
+ *
+ * The alternative approach of checking-while-building can (and did) result
+ * in exponential complexity in the height of the tree, causing perf
problems with Filters with
+ * as few as ~35 nodes if they were skewed.
*/
- private def buildSearchArgument(
- dataTypeMap: Map[String, DataType],
+ private[sql] def buildSearchArgument(
expression: Filter,
builder: Builder): Option[Builder] = {
- createBuilder(dataTypeMap, expression, builder,
canPartialPushDownConjuncts = true)
+ trimUnconvertibleFilters(expression).map { filter =>
+ updateBuilder(filter, builder)
+ builder
+ }
}
- /**
- * @param dataTypeMap a map from the attribute name to its data type.
- * @param expression the input filter predicates.
- * @param builder the input SearchArgument.Builder.
- * @param canPartialPushDownConjuncts whether a subset of conjuncts of
predicates can be pushed
- * down safely. Pushing ONLY one side of
AND down is safe to
- * do at the top level or none of its
ancestors is NOT and OR.
- * @return the builder so far.
- */
- private def createBuilder(
- dataTypeMap: Map[String, DataType],
- expression: Filter,
- builder: Builder,
- canPartialPushDownConjuncts: Boolean): Option[Builder] = {
+ sealed trait ActionType[ReturnType]
+ case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
+ extends ActionType[Option[Filter]]
+ case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]
+
+ // The performAction method can run both the filtering and building
operations for a given
+ // node - we signify which one we want with the `actionType` parameter.
+ //
+ // There are a couple of benefits to coupling the two operations like this:
+ // 1. All the logic for a given predicate is grouped logically in the same
place. You don't
+ // have to scroll across the whole file to see what the filter action for
an And is while
+ // you're looking at the build action.
+ // 2. It's much easier to keep the implementations of the two operations
up-to-date with
+ // each other. If the `filter` and `build` operations are implemented as
separate case-matches
+ // in different methods, it's very easy to change one without
appropriately updating the
+ // other. For example, if we add a new supported node type to `filter`, it
would be very
+ // easy to forget to update `build` to support it too, thus leading to
conversion errors.
+ private def performAction[ReturnType](
+ actionType: ActionType[ReturnType],
+ expression: Filter): ReturnType = {
def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute))
- import org.apache.spark.sql.sources._
-
expression match {
case And(left, right) =>
- // At here, it is not safe to just convert one side and remove the
other side
- // if we do not understand what the parent filters are.
- //
- // Here is an example used to explain the reason.
- // Let's say we have NOT(a = 2 AND b in ('1')) and we do not
understand how to
- // convert b in ('1'). If we only convert a = 2, we will end up with a
filter
- // NOT(a = 2), which will generate wrong results.
- //
- // Pushing one side of AND down is only safe to do at the top level or
in the child
- // AND before hitting NOT or OR conditions, and in this case, the
unsupported predicate
- // can be safely removed.
- val leftBuilderOption =
- createBuilder(dataTypeMap, left, newBuilder,
canPartialPushDownConjuncts)
- val rightBuilderOption =
- createBuilder(dataTypeMap, right, newBuilder,
canPartialPushDownConjuncts)
- (leftBuilderOption, rightBuilderOption) match {
- case (Some(_), Some(_)) =>
- for {
- lhs <- createBuilder(dataTypeMap, left,
- builder.startAnd(), canPartialPushDownConjuncts)
- rhs <- createBuilder(dataTypeMap, right, lhs,
canPartialPushDownConjuncts)
- } yield rhs.end()
-
- case (Some(_), None) if canPartialPushDownConjuncts =>
- createBuilder(dataTypeMap, left, builder,
canPartialPushDownConjuncts)
-
- case (None, Some(_)) if canPartialPushDownConjuncts =>
- createBuilder(dataTypeMap, right, builder,
canPartialPushDownConjuncts)
-
- case _ => None
+ actionType match {
+ case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
+ // At here, it is not safe to just keep one side and remove the
other side
+ // if we do not understand what the parent filters are.
+ //
+ // Here is an example used to explain the reason.
+ // Let's say we have NOT(a = 2 AND b in ('1')) and we do not
understand how to
+ // convert b in ('1'). If we only convert a = 2, we will end up
with a filter
+ // NOT(a = 2), which will generate wrong results.
+ //
+ // Pushing one side of AND down is only safe to do at the top
level or in the child
+ // AND before hitting NOT or OR conditions, and in this case, the
unsupported
+ // predicate can be safely removed.
+ val lhs = performAction(t, left)
+ val rhs = performAction(t, right)
+ (lhs, rhs) match {
+ case (Some(l), Some(r)) => Some(And(l, r))
+ case (Some(_), None) if canPartialPushDownConjuncts => lhs
+ case (None, Some(_)) if canPartialPushDownConjuncts => rhs
+ case _ => None
+ }
+ case BuildSearchArgument(builder) =>
+ builder.startAnd()
+ updateBuilder(left, builder)
+ updateBuilder(right, builder)
+ builder.end()
+ ()
}
case Or(left, right) =>
- // The Or predicate is convertible when both of its children can be
pushed down.
- // That is to say, if one/both of the children can be partially pushed
down, the Or
- // predicate can be partially pushed down as well.
- //
- // Here is an example used to explain the reason.
- // Let's say we have
- // (a1 AND a2) OR (b1 AND b2),
- // a1 and b1 is convertible, while a2 and b2 is not.
- // The predicate can be converted as
- // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
- // As per the logical in And predicate, we can push down (a1 OR b1).
- for {
- _ <- createBuilder(dataTypeMap, left, newBuilder,
canPartialPushDownConjuncts)
- _ <- createBuilder(dataTypeMap, right, newBuilder,
canPartialPushDownConjuncts)
- lhs <- createBuilder(dataTypeMap, left,
- builder.startOr(), canPartialPushDownConjuncts)
- rhs <- createBuilder(dataTypeMap, right, lhs,
canPartialPushDownConjuncts)
- } yield rhs.end()
+ actionType match {
+ case t: TrimUnconvertibleFilters =>
+ // The Or predicate is convertible when both of its children can
be pushed down.
+ // That is to say, if one/both of the children can be partially
pushed down, the Or
+ // predicate can be partially pushed down as well.
+ //
+ // Here is an example used to explain the reason.
+ // Let's say we have
+ // (a1 AND a2) OR (b1 AND b2),
+ // a1 and b1 is convertible, while a2 and b2 is not.
+ // The predicate can be converted as
+ // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+ // As per the logical in And predicate, we can push down (a1 OR
b1).
+ for {
+ lhs: Filter <- performAction(t, left)
+ rhs: Filter <- performAction(t, right)
+ } yield Or(lhs, rhs)
+ case BuildSearchArgument(builder) =>
+ builder.startOr()
+ updateBuilder(left, builder)
+ updateBuilder(right, builder)
+ builder.end()
+ ()
+ }
case Not(child) =>
- for {
- _ <- createBuilder(dataTypeMap, child, newBuilder,
canPartialPushDownConjuncts = false)
- negate <- createBuilder(dataTypeMap,
- child, builder.startNot(), canPartialPushDownConjuncts = false)
- } yield negate.end()
+ actionType match {
+ case t: TrimUnconvertibleFilters =>
+ performAction(t.copy(canPartialPushDownConjuncts = false),
child).map(Not)
+ case BuildSearchArgument(builder) =>
+ builder.startNot()
+ updateBuilder(child, builder)
+ builder.end()
+ ()
+ }
- // NOTE: For all case branches dealing with leaf predicates below, the
additional `startAnd()`
- // call is mandatory. ORC `SearchArgument` builder requires that all
leaf predicates must be
- // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+ // NOTE: For all case branches dealing with leaf predicates below, the
additional
+ // `startAnd()` call is mandatory. ORC `SearchArgument` builder
requires that all leaf
+ // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or
`Not`).
case EqualTo(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().equals(quotedName, getType(attribute),
castedValue).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+ builder.startAnd().equals(quotedName, getType(attribute),
castedValue).end()
+ ()
+ }
case EqualNullSafe(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute),
castedValue).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+ builder.startAnd().nullSafeEquals(quotedName, getType(attribute),
castedValue).end()
+ ()
+ }
case LessThan(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().lessThan(quotedName, getType(attribute),
castedValue).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+ builder.startAnd().lessThan(quotedName, getType(attribute),
castedValue).end()
+ ()
+ }
case LessThanOrEqual(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute),
castedValue).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+ builder.startAnd().lessThanEquals(quotedName, getType(attribute),
castedValue).end()
+ ()
+ }
case GreaterThan(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startNot().lessThanEquals(quotedName, getType(attribute),
castedValue).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+ builder.startNot().lessThanEquals(quotedName, getType(attribute),
castedValue).end()
+ ()
+ }
case GreaterThanOrEqual(attribute, value) if
isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValue = castLiteralValue(value, dataTypeMap(attribute))
- Some(builder.startNot().lessThan(quotedName, getType(attribute),
castedValue).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+ builder.startNot().lessThan(quotedName, getType(attribute),
castedValue).end()
+ ()
+ }
case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ builder.startAnd().isNull(quotedName, getType(attribute)).end()
+ ()
+ }
case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
-
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ builder.startNot().isNull(quotedName, getType(attribute)).end()
+ ()
+ }
case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
- val quotedName = quoteAttributeNameIfNeeded(attribute)
- val castedValues = values.map(v => castLiteralValue(v,
dataTypeMap(attribute)))
- Some(builder.startAnd().in(quotedName, getType(attribute),
- castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
+ actionType match {
+ case _: TrimUnconvertibleFilters => Some(expression)
+ case BuildSearchArgument(builder) =>
+ val quotedName = quoteAttributeNameIfNeeded(attribute)
+ val castedValues = values.map(v => castLiteralValue(v,
dataTypeMap(attribute)))
+ builder.startAnd().in(quotedName, getType(attribute),
+ castedValues.map(_.asInstanceOf[AnyRef]): _*).end()
+ ()
+ }
- case _ => None
+ case _ =>
+ actionType match {
+ case _: TrimUnconvertibleFilters => None
+ case BuildSearchArgument(builder) =>
+ throw new IllegalArgumentException(s"Can't build unsupported
filter ${expression}")
+ }
}
}
+
+ private def performFilter(
Review comment:
I removed the `performFilter` method and effectively replaced it with the
`trimUnconvertibleFilters` one. I prefer to keep `updateBuilder` because I
think it gives the right level of abstraction for the overall
`buildSearchArgument` method: trim, and then build. Without these helped
methods, it starts looking like this:
```
performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts =
true), expression)
.map { filter =>
performAction(BuildSearchArgument(builder), filter)
builder
}
```
which is a bit too noisy and convoluted to grasp the high-level structure at
a glance.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]