Repository: spark
Updated Branches:
  refs/heads/branch-2.0 94d52d765 -> f91614f36


[SPARK-17270][SQL] Move object optimization rules into its own file (branch-2.0)

## What changes were proposed in this pull request?
As part of breaking Optimizer.scala apart, this patch moves various Dataset 
object optimization rules into a single file. I'm submitting separate pull 
requests so we can more easily merge this in branch-2.0 to simplify optimizer 
backports.

This is https://github.com/apache/spark/pull/14839 but for branch-2.0.

## How was this patch tested?
This should be covered by existing tests.

Author: Reynold Xin <r...@databricks.com>

Closes #14843 from rxin/SPARK-17270-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f91614f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f91614f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f91614f3

Branch: refs/heads/branch-2.0
Commit: f91614f36472957355fad7d69d66327807fe80c8
Parents: 94d52d7
Author: Reynold Xin <r...@databricks.com>
Authored: Sat Aug 27 00:31:49 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Aug 27 00:31:49 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  72 -------------
 .../spark/sql/catalyst/optimizer/objects.scala  | 101 +++++++++++++++++++
 2 files changed, 101 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f91614f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f3f1d21..15d33c1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -187,25 +187,6 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
 }
 
 /**
- * Removes cases where we are unnecessarily going between the object and 
serialized (InternalRow)
- * representation of data item.  For example back to back map operations.
- */
-object EliminateSerialization extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case d @ DeserializeToObject(_, _, s: SerializeFromObject)
-        if d.outputObjectType == s.inputObjectType =>
-      // Adds an extra Project here, to preserve the output expr id of 
`DeserializeToObject`.
-      // We will remove it later in RemoveAliasOnlyProject rule.
-      val objAttr =
-        Alias(s.child.output.head, s.child.output.head.name)(exprId = 
d.output.head.exprId)
-      Project(objAttr :: Nil, s.child)
-    case a @ AppendColumns(_, _, _, s: SerializeFromObject)
-        if a.deserializer.dataType == s.inputObjectType =>
-      AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)
-  }
-}
-
-/**
  * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed 
inputs of outer joins.
  */
 object LimitPushDown extends Rule[LogicalPlan] {
@@ -1583,59 +1564,6 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
 }
 
 /**
- * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] 
beneath it and a
- * [[SerializeFromObject]] above it.  If these serializations can't be 
eliminated, we should embed
- * the deserializer in filter condition to save the extra serialization at 
last.
- */
-object EmbedSerializerInFilter extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
-      // SPARK-15632: Conceptually, filter operator should never introduce 
schema change. This
-      // optimization rule also relies on this assumption. However, Dataset 
typed filter operator
-      // does introduce schema changes in some cases. Thus, we only enable 
this optimization when
-      //
-      //  1. either input and output schemata are exactly the same, or
-      //  2. both input and output schemata are single-field schema and share 
the same type.
-      //
-      // The 2nd case is included because encoders for primitive types always 
have only a single
-      // field with hard-coded field name "value".
-      // TODO Cleans this up after fixing SPARK-15632.
-      if s.schema == d.child.schema || samePrimitiveType(s.schema, 
d.child.schema) =>
-
-      val numObjects = condition.collect {
-        case a: Attribute if a == d.output.head => a
-      }.length
-
-      if (numObjects > 1) {
-        // If the filter condition references the object more than one times, 
we should not embed
-        // deserializer in it as the deserialization will happen many times 
and slow down the
-        // execution.
-        // TODO: we can still embed it if we can make sure subexpression 
elimination works here.
-        s
-      } else {
-        val newCondition = condition transform {
-          case a: Attribute if a == d.output.head => d.deserializer
-        }
-        val filter = Filter(newCondition, d.child)
-
-        // Adds an extra Project here, to preserve the output expr id of 
`SerializeFromObject`.
-        // We will remove it later in RemoveAliasOnlyProject rule.
-        val objAttrs = filter.output.zip(s.output).map { case (fout, sout) =>
-          Alias(fout, fout.name)(exprId = sout.exprId)
-        }
-        Project(objAttrs, filter)
-      }
-  }
-
-  def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
-    (lhs, rhs) match {
-      case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == 
f2.dataType
-      case _ => false
-    }
-  }
-}
-
-/**
  * This rule rewrites predicate sub-queries into left semi/anti joins. The 
following predicates
  * are supported:
  * a. EXISTS/NOT EXISTS will be rewritten as semi/anti join, unresolved 
conditions in Filter

http://git-wip-us.apache.org/repos/asf/spark/blob/f91614f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
new file mode 100644
index 0000000..8a25cee
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.types.StructType
+
+/*
+ * This file defines optimization rules related to object manipulation (for 
the Dataset API).
+ */
+
+
+/**
+ * Removes cases where we are unnecessarily going between the object and 
serialized (InternalRow)
+ * representation of data item.  For example back to back map operations.
+ */
+object EliminateSerialization extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case d @ DeserializeToObject(_, _, s: SerializeFromObject)
+        if d.outputObjectType == s.inputObjectType =>
+      // Adds an extra Project here, to preserve the output expr id of 
`DeserializeToObject`.
+      // We will remove it later in RemoveAliasOnlyProject rule.
+      val objAttr =
+        Alias(s.child.output.head, s.child.output.head.name)(exprId = 
d.output.head.exprId)
+      Project(objAttr :: Nil, s.child)
+    case a @ AppendColumns(_, _, _, s: SerializeFromObject)
+        if a.deserializer.dataType == s.inputObjectType =>
+      AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)
+  }
+}
+
+
+/**
+ * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] 
beneath it and a
+ * [[SerializeFromObject]] above it.  If these serializations can't be 
eliminated, we should embed
+ * the deserializer in filter condition to save the extra serialization at 
last.
+ */
+object EmbedSerializerInFilter extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
+      // SPARK-15632: Conceptually, filter operator should never introduce 
schema change. This
+      // optimization rule also relies on this assumption. However, Dataset 
typed filter operator
+      // does introduce schema changes in some cases. Thus, we only enable 
this optimization when
+      //
+      //  1. either input and output schemata are exactly the same, or
+      //  2. both input and output schemata are single-field schema and share 
the same type.
+      //
+      // The 2nd case is included because encoders for primitive types always 
have only a single
+      // field with hard-coded field name "value".
+      // TODO Cleans this up after fixing SPARK-15632.
+      if s.schema == d.child.schema || samePrimitiveType(s.schema, 
d.child.schema) =>
+
+      val numObjects = condition.collect {
+        case a: Attribute if a == d.output.head => a
+      }.length
+
+      if (numObjects > 1) {
+        // If the filter condition references the object more than one times, 
we should not embed
+        // deserializer in it as the deserialization will happen many times 
and slow down the
+        // execution.
+        // TODO: we can still embed it if we can make sure subexpression 
elimination works here.
+        s
+      } else {
+        val newCondition = condition transform {
+          case a: Attribute if a == d.output.head => d.deserializer
+        }
+        val filter = Filter(newCondition, d.child)
+
+        // Adds an extra Project here, to preserve the output expr id of 
`SerializeFromObject`.
+        // We will remove it later in RemoveAliasOnlyProject rule.
+        val objAttrs = filter.output.zip(s.output).map { case (fout, sout) =>
+          Alias(fout, fout.name)(exprId = sout.exprId)
+        }
+        Project(objAttrs, filter)
+      }
+  }
+
+  def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
+    (lhs, rhs) match {
+      case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == 
f2.dataType
+      case _ => false
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to