Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/13846#discussion_r68750055
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -1637,55 +1654,31 @@ case class GetCurrentDatabase(sessionCatalog:
SessionCatalog) extends Rule[Logic
}
/**
- * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]]
beneath it and a
- * [[SerializeFromObject]] above it. If these serializations can't be
eliminated, we should embed
- * the deserializer in filter condition to save the extra serialization at
last.
+ * Combines all adjacent [[TypedFilter]]s, which operate on same type
object in condition, into a
+ * single [[Filter]].
*/
-object EmbedSerializerInFilter extends Rule[LogicalPlan] {
+object CombineTypedFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case s @ SerializeFromObject(_, Filter(condition, d:
DeserializeToObject))
- // SPARK-15632: Conceptually, filter operator should never introduce
schema change. This
- // optimization rule also relies on this assumption. However,
Dataset typed filter operator
- // does introduce schema changes in some cases. Thus, we only enable
this optimization when
- //
- // 1. either input and output schemata are exactly the same, or
- // 2. both input and output schemata are single-field schema and
share the same type.
- //
- // The 2nd case is included because encoders for primitive types
always have only a single
- // field with hard-coded field name "value".
- // TODO Cleans this up after fixing SPARK-15632.
- if s.schema == d.child.schema || samePrimitiveType(s.schema,
d.child.schema) =>
-
- val numObjects = condition.collect {
- case a: Attribute if a == d.output.head => a
- }.length
-
- if (numObjects > 1) {
- // If the filter condition references the object more than one
times, we should not embed
- // deserializer in it as the deserialization will happen many
times and slow down the
- // execution.
- // TODO: we can still embed it if we can make sure subexpression
elimination works here.
- s
+ case t @ TypedFilter(_, deserializer, child) =>
+ val filters = collectTypedFiltersOnSameTypeObj(child,
deserializer.dataType, ArrayBuffer(t))
+ if (filters.length > 1) {
+ val objHolder = BoundReference(0, deserializer.dataType, nullable
= false)
+ val condition = filters.map(_.getCondition(objHolder)).reduce(And)
+ Filter(ReferenceToExpressions(condition, deserializer :: Nil),
filters.last.child)
} else {
- val newCondition = condition transform {
- case a: Attribute if a == d.output.head => d.deserializer
- }
- val filter = Filter(newCondition, d.child)
-
- // Adds an extra Project here, to preserve the output expr id of
`SerializeFromObject`.
- // We will remove it later in RemoveAliasOnlyProject rule.
- val objAttrs = filter.output.zip(s.output).map { case (fout, sout)
=>
- Alias(fout, fout.name)(exprId = sout.exprId)
- }
- Project(objAttrs, filter)
+ t
}
}
- def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
- (lhs, rhs) match {
- case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType
== f2.dataType
- case _ => false
- }
+ @tailrec
+ private def collectTypedFiltersOnSameTypeObj(
+ plan: LogicalPlan,
+ objType: DataType,
+ filters: ArrayBuffer[TypedFilter]): Array[TypedFilter] = plan match {
+ case t: TypedFilter if t.deserializer.dataType == objType =>
+ filters += t
--- End diff --
Shall we prepend rather than append found filters here? Otherwise filter
predicates will be evaluated in reverse order after being combined. Also would
be nice to comment about this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]