spark git commit: [SPARK-16134][SQL] optimizer rules for typed filter

2016-06-29 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 011befd20 -> 8da431473


[SPARK-16134][SQL] optimizer rules for typed filter

## What changes were proposed in this pull request?

This PR adds 3 optimizer rules for typed filter:

1. push typed filter down through `SerializeFromObject` and eliminate the 
deserialization in filter condition.
2. pull typed filter up through `SerializeFromObject` and eliminate the 
deserialization in filter condition.
3. combine adjacent typed filters and share the deserialized object among all 
the condition expressions.

This PR also adds `TypedFilter` logical plan, to separate it from normal 
filter, so that the concept is more clear and it's easier to write optimizer 
rules.

## How was this patch tested?

`TypedFilterOptimizationSuite`

Author: Wenchen Fan 

Closes #13846 from cloud-fan/filter.

(cherry picked from commit d063898bebaaf4ec2aad24c3ac70aabdbf97a190)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8da43147
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8da43147
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8da43147

Branch: refs/heads/branch-2.0
Commit: 8da4314735ed55f259642e2977d8d7bf2212474f
Parents: 011befd
Author: Wenchen Fan 
Authored: Thu Jun 30 08:15:08 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jun 30 08:15:50 2016 +0800

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  6 +-
 .../expressions/ReferenceToExpressions.scala|  1 +
 .../sql/catalyst/optimizer/Optimizer.scala  | 98 +---
 .../sql/catalyst/plans/logical/object.scala | 47 +-
 .../TypedFilterOptimizationSuite.scala  | 86 +
 .../scala/org/apache/spark/sql/Dataset.scala| 12 +--
 .../spark/sql/execution/SparkStrategies.scala   |  2 +
 .../scala/org/apache/spark/sql/QueryTest.scala  |  1 +
 8 files changed, 162 insertions(+), 91 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 2ca990d..84c9cc8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -293,11 +293,7 @@ package object dsl {
 
   def where(condition: Expression): LogicalPlan = Filter(condition, 
logicalPlan)
 
-  def filter[T : Encoder](func: T => Boolean): LogicalPlan = {
-val deserialized = logicalPlan.deserialize[T]
-val condition = expressions.callFunction(func, BooleanType, 
deserialized.output.head)
-Filter(condition, deserialized).serialize[T]
-  }
+  def filter[T : Encoder](func: T => Boolean): LogicalPlan = 
TypedFilter(func, logicalPlan)
 
   def serialize[T : Encoder]: LogicalPlan = 
CatalystSerde.serialize[T](logicalPlan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
index 502d791..127797c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
@@ -45,6 +45,7 @@ case class ReferenceToExpressions(result: Expression, 
children: Seq[Expression])
 var maxOrdinal = -1
 result foreach {
   case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = 
b.ordinal
+  case _ =>
 }
 if (maxOrdinal > children.length) {
   return TypeCheckFailure(s"The result expression need $maxOrdinal input 
expressions, but " +

http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f24f8b7..aa90735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

spark git commit: [SPARK-16134][SQL] optimizer rules for typed filter

2016-06-29 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 2eaabfa41 -> d063898be


[SPARK-16134][SQL] optimizer rules for typed filter

## What changes were proposed in this pull request?

This PR adds 3 optimizer rules for typed filter:

1. push typed filter down through `SerializeFromObject` and eliminate the 
deserialization in filter condition.
2. pull typed filter up through `SerializeFromObject` and eliminate the 
deserialization in filter condition.
3. combine adjacent typed filters and share the deserialized object among all 
the condition expressions.

This PR also adds `TypedFilter` logical plan, to separate it from normal 
filter, so that the concept is more clear and it's easier to write optimizer 
rules.

## How was this patch tested?

`TypedFilterOptimizationSuite`

Author: Wenchen Fan 

Closes #13846 from cloud-fan/filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d063898b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d063898b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d063898b

Branch: refs/heads/master
Commit: d063898bebaaf4ec2aad24c3ac70aabdbf97a190
Parents: 2eaabfa
Author: Wenchen Fan 
Authored: Thu Jun 30 08:15:08 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jun 30 08:15:08 2016 +0800

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  6 +-
 .../expressions/ReferenceToExpressions.scala|  1 +
 .../sql/catalyst/optimizer/Optimizer.scala  | 98 +---
 .../sql/catalyst/plans/logical/object.scala | 47 +-
 .../TypedFilterOptimizationSuite.scala  | 86 +
 .../scala/org/apache/spark/sql/Dataset.scala| 12 +--
 .../spark/sql/execution/SparkStrategies.scala   |  2 +
 .../scala/org/apache/spark/sql/QueryTest.scala  |  1 +
 8 files changed, 162 insertions(+), 91 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 2ca990d..84c9cc8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -293,11 +293,7 @@ package object dsl {
 
   def where(condition: Expression): LogicalPlan = Filter(condition, 
logicalPlan)
 
-  def filter[T : Encoder](func: T => Boolean): LogicalPlan = {
-val deserialized = logicalPlan.deserialize[T]
-val condition = expressions.callFunction(func, BooleanType, 
deserialized.output.head)
-Filter(condition, deserialized).serialize[T]
-  }
+  def filter[T : Encoder](func: T => Boolean): LogicalPlan = 
TypedFilter(func, logicalPlan)
 
   def serialize[T : Encoder]: LogicalPlan = 
CatalystSerde.serialize[T](logicalPlan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
index 502d791..127797c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
@@ -45,6 +45,7 @@ case class ReferenceToExpressions(result: Expression, 
children: Seq[Expression])
 var maxOrdinal = -1
 result foreach {
   case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = 
b.ordinal
+  case _ =>
 }
 if (maxOrdinal > children.length) {
   return TypeCheckFailure(s"The result expression need $maxOrdinal input 
expressions, but " +

http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9bc8cea..842d6bc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -21,6 +21,7 @@ import scala.annotation.tailrec
 import scala.collection.immutable.HashSet
 import sca