spark git commit: [SPARK-16134][SQL] optimizer rules for typed filter
Repository: spark Updated Branches: refs/heads/branch-2.0 011befd20 -> 8da431473 [SPARK-16134][SQL] optimizer rules for typed filter ## What changes were proposed in this pull request? This PR adds 3 optimizer rules for typed filter: 1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition. 2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition. 3. combine adjacent typed filters and share the deserialized object among all the condition expressions. This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules. ## How was this patch tested? `TypedFilterOptimizationSuite` Author: Wenchen Fan Closes #13846 from cloud-fan/filter. (cherry picked from commit d063898bebaaf4ec2aad24c3ac70aabdbf97a190) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8da43147 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8da43147 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8da43147 Branch: refs/heads/branch-2.0 Commit: 8da4314735ed55f259642e2977d8d7bf2212474f Parents: 011befd Author: Wenchen Fan Authored: Thu Jun 30 08:15:08 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 08:15:50 2016 +0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../expressions/ReferenceToExpressions.scala| 1 + .../sql/catalyst/optimizer/Optimizer.scala | 98 +--- .../sql/catalyst/plans/logical/object.scala | 47 +- .../TypedFilterOptimizationSuite.scala | 86 + .../scala/org/apache/spark/sql/Dataset.scala| 12 +-- .../spark/sql/execution/SparkStrategies.scala | 2 + .../scala/org/apache/spark/sql/QueryTest.scala | 1 + 8 files changed, 162 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 2ca990d..84c9cc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -293,11 +293,7 @@ package object dsl { def where(condition: Expression): LogicalPlan = Filter(condition, logicalPlan) - def filter[T : Encoder](func: T => Boolean): LogicalPlan = { -val deserialized = logicalPlan.deserialize[T] -val condition = expressions.callFunction(func, BooleanType, deserialized.output.head) -Filter(condition, deserialized).serialize[T] - } + def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan) def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 502d791..127797c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -45,6 +45,7 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) var maxOrdinal = -1 result foreach { case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = b.ordinal + case _ => } if (maxOrdinal > children.length) { return TypeCheckFailure(s"The result expression need $maxOrdinal input expressions, but " + http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f24f8b7..aa90735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
spark git commit: [SPARK-16134][SQL] optimizer rules for typed filter
Repository: spark Updated Branches: refs/heads/master 2eaabfa41 -> d063898be [SPARK-16134][SQL] optimizer rules for typed filter ## What changes were proposed in this pull request? This PR adds 3 optimizer rules for typed filter: 1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition. 2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition. 3. combine adjacent typed filters and share the deserialized object among all the condition expressions. This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules. ## How was this patch tested? `TypedFilterOptimizationSuite` Author: Wenchen Fan Closes #13846 from cloud-fan/filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d063898b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d063898b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d063898b Branch: refs/heads/master Commit: d063898bebaaf4ec2aad24c3ac70aabdbf97a190 Parents: 2eaabfa Author: Wenchen Fan Authored: Thu Jun 30 08:15:08 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 08:15:08 2016 +0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../expressions/ReferenceToExpressions.scala| 1 + .../sql/catalyst/optimizer/Optimizer.scala | 98 +--- .../sql/catalyst/plans/logical/object.scala | 47 +- .../TypedFilterOptimizationSuite.scala | 86 + .../scala/org/apache/spark/sql/Dataset.scala| 12 +-- .../spark/sql/execution/SparkStrategies.scala | 2 + .../scala/org/apache/spark/sql/QueryTest.scala | 1 + 8 files changed, 162 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 2ca990d..84c9cc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -293,11 +293,7 @@ package object dsl { def where(condition: Expression): LogicalPlan = Filter(condition, logicalPlan) - def filter[T : Encoder](func: T => Boolean): LogicalPlan = { -val deserialized = logicalPlan.deserialize[T] -val condition = expressions.callFunction(func, BooleanType, deserialized.output.head) -Filter(condition, deserialized).serialize[T] - } + def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan) def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 502d791..127797c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -45,6 +45,7 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) var maxOrdinal = -1 result foreach { case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = b.ordinal + case _ => } if (maxOrdinal > children.length) { return TypeCheckFailure(s"The result expression need $maxOrdinal input expressions, but " + http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9bc8cea..842d6bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,6 +21,7 @@ import scala.annotation.tailrec import scala.collection.immutable.HashSet import sca