Repository: spark Updated Branches: refs/heads/branch-2.0 8da431473 -> e1bdf1e02
Revert "[SPARK-16134][SQL] optimizer rules for typed filter" This reverts commit 8da4314735ed55f259642e2977d8d7bf2212474f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1bdf1e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1bdf1e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1bdf1e0 Branch: refs/heads/branch-2.0 Commit: e1bdf1e02483bf513b6e012e8921d440a5efbc11 Parents: 8da4314 Author: Cheng Lian <l...@databricks.com> Authored: Thu Jun 30 08:17:43 2016 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Thu Jun 30 08:17:43 2016 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../expressions/ReferenceToExpressions.scala | 1 - .../sql/catalyst/optimizer/Optimizer.scala | 98 +++++++++++--------- .../sql/catalyst/plans/logical/object.scala | 47 +--------- .../TypedFilterOptimizationSuite.scala | 86 ++++------------- .../scala/org/apache/spark/sql/Dataset.scala | 12 ++- .../spark/sql/execution/SparkStrategies.scala | 2 - .../scala/org/apache/spark/sql/QueryTest.scala | 1 - 8 files changed, 91 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 84c9cc8..2ca990d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -293,7 +293,11 @@ package object dsl { def where(condition: Expression): LogicalPlan = Filter(condition, logicalPlan) - def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan) + def filter[T : Encoder](func: T => Boolean): LogicalPlan = { + val deserialized = logicalPlan.deserialize[T] + val condition = expressions.callFunction(func, BooleanType, deserialized.output.head) + Filter(condition, deserialized).serialize[T] + } def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 127797c..502d791 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -45,7 +45,6 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) var maxOrdinal = -1 result foreach { case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = b.ordinal - case _ => } if (maxOrdinal > children.length) { return TypeCheckFailure(s"The result expression need $maxOrdinal input expressions, but " + http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa90735..f24f8b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,7 +21,6 @@ import scala.annotation.tailrec import scala.collection.immutable.HashSet import scala.collection.mutable.ArrayBuffer -import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} @@ -110,7 +109,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :: Batch("Typed Filter Optimization", fixedPoint, - CombineTypedFilters) :: + EmbedSerializerInFilter, + RemoveAliasOnlyProject) :: Batch("LocalRelation", fixedPoint, ConvertToLocalRelation) :: Batch("OptimizeCodegen", Once, @@ -205,33 +205,15 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] { object EliminateSerialization extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case d @ DeserializeToObject(_, _, s: SerializeFromObject) - if d.outputObjAttr.dataType == s.inputObjAttr.dataType => + if d.outputObjectType == s.inputObjectType => // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. // We will remove it later in RemoveAliasOnlyProject rule. - val objAttr = Alias(s.inputObjAttr, s.inputObjAttr.name)(exprId = d.outputObjAttr.exprId) + val objAttr = + Alias(s.child.output.head, s.child.output.head.name)(exprId = d.output.head.exprId) Project(objAttr :: Nil, s.child) - case a @ AppendColumns(_, _, _, s: SerializeFromObject) - if a.deserializer.dataType == s.inputObjAttr.dataType => + if a.deserializer.dataType == s.inputObjectType => AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child) - - // If there is a `SerializeFromObject` under typed filter and its input object type is same with - // the typed filter's deserializer, we can convert typed filter to normal filter without - // deserialization in condition, and push it down through `SerializeFromObject`. - // e.g. `ds.map(...).filter(...)` can be optimized by this rule to save extra deserialization, - // but `ds.map(...).as[AnotherType].filter(...)` can not be optimized. - case f @ TypedFilter(_, _, s: SerializeFromObject) - if f.deserializer.dataType == s.inputObjAttr.dataType => - s.copy(child = f.withObjectProducerChild(s.child)) - - // If there is a `DeserializeToObject` upon typed filter and its output object type is same with - // the typed filter's deserializer, we can convert typed filter to normal filter without - // deserialization in condition, and pull it up through `DeserializeToObject`. - // e.g. `ds.filter(...).map(...)` can be optimized by this rule to save extra deserialization, - // but `ds.filter(...).as[AnotherType].map(...)` can not be optimized. - case d @ DeserializeToObject(_, _, f: TypedFilter) - if d.outputObjAttr.dataType == f.deserializer.dataType => - f.withObjectProducerChild(d.copy(child = f.child)) } } @@ -1624,30 +1606,54 @@ case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[Logic } /** - * Combines two adjacent [[TypedFilter]]s, which operate on same type object in condition, into one, - * mering the filter functions into one conjunctive function. + * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a + * [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed + * the deserializer in filter condition to save the extra serialization at last. */ -object CombineTypedFilters extends Rule[LogicalPlan] { +object EmbedSerializerInFilter extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case t1 @ TypedFilter(_, _, t2 @ TypedFilter(_, _, child)) - if t1.deserializer.dataType == t2.deserializer.dataType => - TypedFilter(combineFilterFunction(t2.func, t1.func), t1.deserializer, child) - } - - private def combineFilterFunction(func1: AnyRef, func2: AnyRef): Any => Boolean = { - (func1, func2) match { - case (f1: FilterFunction[_], f2: FilterFunction[_]) => - input => f1.asInstanceOf[FilterFunction[Any]].call(input) && - f2.asInstanceOf[FilterFunction[Any]].call(input) - case (f1: FilterFunction[_], f2) => - input => f1.asInstanceOf[FilterFunction[Any]].call(input) && - f2.asInstanceOf[Any => Boolean](input) - case (f1, f2: FilterFunction[_]) => - input => f1.asInstanceOf[Any => Boolean].apply(input) && - f2.asInstanceOf[FilterFunction[Any]].call(input) - case (f1, f2) => - input => f1.asInstanceOf[Any => Boolean].apply(input) && - f2.asInstanceOf[Any => Boolean].apply(input) + case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) + // SPARK-15632: Conceptually, filter operator should never introduce schema change. This + // optimization rule also relies on this assumption. However, Dataset typed filter operator + // does introduce schema changes in some cases. Thus, we only enable this optimization when + // + // 1. either input and output schemata are exactly the same, or + // 2. both input and output schemata are single-field schema and share the same type. + // + // The 2nd case is included because encoders for primitive types always have only a single + // field with hard-coded field name "value". + // TODO Cleans this up after fixing SPARK-15632. + if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) => + + val numObjects = condition.collect { + case a: Attribute if a == d.output.head => a + }.length + + if (numObjects > 1) { + // If the filter condition references the object more than one times, we should not embed + // deserializer in it as the deserialization will happen many times and slow down the + // execution. + // TODO: we can still embed it if we can make sure subexpression elimination works here. + s + } else { + val newCondition = condition transform { + case a: Attribute if a == d.output.head => d.deserializer + } + val filter = Filter(newCondition, d.child) + + // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. + // We will remove it later in RemoveAliasOnlyProject rule. + val objAttrs = filter.output.zip(s.output).map { case (fout, sout) => + Alias(fout, fout.name)(exprId = sout.exprId) + } + Project(objAttrs, filter) + } + } + + def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = { + (lhs, rhs) match { + case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType + case _ => false } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index e1890ed..7beeeb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -17,15 +17,11 @@ package org.apache.spark.sql.catalyst.plans.logical -import scala.language.existentials - -import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{Encoder, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.objects.Invoke import org.apache.spark.sql.types._ object CatalystSerde { @@ -49,11 +45,13 @@ object CatalystSerde { */ trait ObjectProducer extends LogicalPlan { // The attribute that reference to the single object field this operator outputs. - def outputObjAttr: Attribute + protected def outputObjAttr: Attribute override def output: Seq[Attribute] = outputObjAttr :: Nil override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) + + def outputObjectType: DataType = outputObjAttr.dataType } /** @@ -66,7 +64,7 @@ trait ObjectConsumer extends UnaryNode { // This operator always need all columns of its child, even it doesn't reference to. override def references: AttributeSet = child.outputSet - def inputObjAttr: Attribute = child.output.head + def inputObjectType: DataType = child.output.head.dataType } /** @@ -169,43 +167,6 @@ case class MapElements( outputObjAttr: Attribute, child: LogicalPlan) extends ObjectConsumer with ObjectProducer -object TypedFilter { - def apply[T : Encoder](func: AnyRef, child: LogicalPlan): TypedFilter = { - TypedFilter(func, UnresolvedDeserializer(encoderFor[T].deserializer), child) - } -} - -/** - * A relation produced by applying `func` to each element of the `child` and filter them by the - * resulting boolean value. - * - * This is logically equal to a normal [[Filter]] operator whose condition expression is decoding - * the input row to object and apply the given function with decoded object. However we need the - * encapsulation of [[TypedFilter]] to make the concept more clear and make it easier to write - * optimizer rules. - */ -case class TypedFilter( - func: AnyRef, - deserializer: Expression, - child: LogicalPlan) extends UnaryNode { - - override def output: Seq[Attribute] = child.output - - def withObjectProducerChild(obj: LogicalPlan): Filter = { - assert(obj.output.length == 1) - Filter(typedCondition(obj.output.head), obj) - } - - def typedCondition(input: Expression): Expression = { - val (funcClass, methodName) = func match { - case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call" - case _ => classOf[Any => Boolean] -> "apply" - } - val funcObj = Literal.create(func, ObjectType(funcClass)) - Invoke(funcObj, methodName, BooleanType, input :: Nil) - } -} - /** Factory for constructing new `AppendColumn` nodes. */ object AppendColumns { def apply[T : Encoder, U : Encoder]( http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala index 56f096f..63d87bf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.optimizer import scala.reflect.runtime.universe.TypeTag +import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, TypedFilter} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.BooleanType @@ -32,91 +33,44 @@ class TypedFilterOptimizationSuite extends PlanTest { val batches = Batch("EliminateSerialization", FixedPoint(50), EliminateSerialization) :: - Batch("CombineTypedFilters", FixedPoint(50), - CombineTypedFilters) :: Nil + Batch("EmbedSerializerInFilter", FixedPoint(50), + EmbedSerializerInFilter) :: Nil } implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() - test("filter after serialize with the same object type") { + test("back to back filter") { val input = LocalRelation('_1.int, '_2.int) - val f = (i: (Int, Int)) => i._1 > 0 + val f1 = (i: (Int, Int)) => i._1 > 0 + val f2 = (i: (Int, Int)) => i._2 > 0 - val query = input - .deserialize[(Int, Int)] - .serialize[(Int, Int)] - .filter(f).analyze + val query = input.filter(f1).filter(f2).analyze val optimized = Optimize.execute(query) - val expected = input - .deserialize[(Int, Int)] - .where(callFunction(f, BooleanType, 'obj)) + val expected = input.deserialize[(Int, Int)] + .where(callFunction(f1, BooleanType, 'obj)) + .select('obj.as("obj")) + .where(callFunction(f2, BooleanType, 'obj)) .serialize[(Int, Int)].analyze comparePlans(optimized, expected) } - test("filter after serialize with different object types") { - val input = LocalRelation('_1.int, '_2.int) - val f = (i: OtherTuple) => i._1 > 0 - - val query = input - .deserialize[(Int, Int)] - .serialize[(Int, Int)] - .filter(f).analyze - val optimized = Optimize.execute(query) - comparePlans(optimized, query) - } - - test("filter before deserialize with the same object type") { + // TODO: Remove this after we completely fix SPARK-15632 by adding optimization rules + // for typed filters. + ignore("embed deserializer in typed filter condition if there is only one filter") { val input = LocalRelation('_1.int, '_2.int) val f = (i: (Int, Int)) => i._1 > 0 - val query = input - .filter(f) - .deserialize[(Int, Int)] - .serialize[(Int, Int)].analyze + val query = input.filter(f).analyze val optimized = Optimize.execute(query) - val expected = input - .deserialize[(Int, Int)] - .where(callFunction(f, BooleanType, 'obj)) - .serialize[(Int, Int)].analyze + val deserializer = UnresolvedDeserializer(encoderFor[(Int, Int)].deserializer) + val condition = callFunction(f, BooleanType, deserializer) + val expected = input.where(condition).select('_1.as("_1"), '_2.as("_2")).analyze comparePlans(optimized, expected) } - - test("filter before deserialize with different object types") { - val input = LocalRelation('_1.int, '_2.int) - val f = (i: OtherTuple) => i._1 > 0 - - val query = input - .filter(f) - .deserialize[(Int, Int)] - .serialize[(Int, Int)].analyze - val optimized = Optimize.execute(query) - comparePlans(optimized, query) - } - - test("back to back filter with the same object type") { - val input = LocalRelation('_1.int, '_2.int) - val f1 = (i: (Int, Int)) => i._1 > 0 - val f2 = (i: (Int, Int)) => i._2 > 0 - - val query = input.filter(f1).filter(f2).analyze - val optimized = Optimize.execute(query) - assert(optimized.collect { case t: TypedFilter => t }.length == 1) - } - - test("back to back filter with different object types") { - val input = LocalRelation('_1.int, '_2.int) - val f1 = (i: (Int, Int)) => i._1 > 0 - val f2 = (i: OtherTuple) => i._2 > 0 - - val query = input.filter(f1).filter(f2).analyze - val optimized = Optimize.execute(query) - assert(optimized.collect { case t: TypedFilter => t }.length == 2) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 8e914fc..067cbec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1964,7 +1964,11 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: T => Boolean): Dataset[T] = { - withTypedPlan(TypedFilter(func, logicalPlan)) + val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) + val function = Literal.create(func, ObjectType(classOf[T => Boolean])) + val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil) + val filter = Filter(condition, logicalPlan) + withTypedPlan(filter) } /** @@ -1977,7 +1981,11 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: FilterFunction[T]): Dataset[T] = { - withTypedPlan(TypedFilter(func, logicalPlan)) + val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) + val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]])) + val condition = Invoke(function, "call", BooleanType, deserializer :: Nil) + val filter = Filter(condition, logicalPlan) + withTypedPlan(filter) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5e643ea..b619d4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -385,8 +385,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.ProjectExec(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => execution.FilterExec(condition, planLater(child)) :: Nil - case f: logical.TypedFilter => - execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil case e @ logical.Expand(_, _, child) => execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil case logical.Window(windowExprs, partitionSpec, orderSpec, child) => http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index ab50513..b15f38c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -238,7 +238,6 @@ abstract class QueryTest extends PlanTest { case _: ObjectConsumer => return case _: ObjectProducer => return case _: AppendColumns => return - case _: TypedFilter => return case _: LogicalRelation => return case p if p.getClass.getSimpleName == "MetastoreRelation" => return case _: MemoryPlan => return --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org