Repository: spark Updated Branches: refs/heads/branch-2.0 94d52d765 -> f91614f36
[SPARK-17270][SQL] Move object optimization rules into its own file (branch-2.0) ## What changes were proposed in this pull request? As part of breaking Optimizer.scala apart, this patch moves various Dataset object optimization rules into a single file. I'm submitting separate pull requests so we can more easily merge this in branch-2.0 to simplify optimizer backports. This is https://github.com/apache/spark/pull/14839 but for branch-2.0. ## How was this patch tested? This should be covered by existing tests. Author: Reynold Xin <r...@databricks.com> Closes #14843 from rxin/SPARK-17270-branch-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f91614f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f91614f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f91614f3 Branch: refs/heads/branch-2.0 Commit: f91614f36472957355fad7d69d66327807fe80c8 Parents: 94d52d7 Author: Reynold Xin <r...@databricks.com> Authored: Sat Aug 27 00:31:49 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat Aug 27 00:31:49 2016 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 72 ------------- .../spark/sql/catalyst/optimizer/objects.scala | 101 +++++++++++++++++++ 2 files changed, 101 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f91614f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f3f1d21..15d33c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -187,25 +187,6 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] { } /** - * Removes cases where we are unnecessarily going between the object and serialized (InternalRow) - * representation of data item. For example back to back map operations. - */ -object EliminateSerialization extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case d @ DeserializeToObject(_, _, s: SerializeFromObject) - if d.outputObjectType == s.inputObjectType => - // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. - // We will remove it later in RemoveAliasOnlyProject rule. - val objAttr = - Alias(s.child.output.head, s.child.output.head.name)(exprId = d.output.head.exprId) - Project(objAttr :: Nil, s.child) - case a @ AppendColumns(_, _, _, s: SerializeFromObject) - if a.deserializer.dataType == s.inputObjectType => - AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child) - } -} - -/** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ object LimitPushDown extends Rule[LogicalPlan] { @@ -1583,59 +1564,6 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { } /** - * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a - * [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed - * the deserializer in filter condition to save the extra serialization at last. - */ -object EmbedSerializerInFilter extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) - // SPARK-15632: Conceptually, filter operator should never introduce schema change. This - // optimization rule also relies on this assumption. However, Dataset typed filter operator - // does introduce schema changes in some cases. Thus, we only enable this optimization when - // - // 1. either input and output schemata are exactly the same, or - // 2. both input and output schemata are single-field schema and share the same type. - // - // The 2nd case is included because encoders for primitive types always have only a single - // field with hard-coded field name "value". - // TODO Cleans this up after fixing SPARK-15632. - if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) => - - val numObjects = condition.collect { - case a: Attribute if a == d.output.head => a - }.length - - if (numObjects > 1) { - // If the filter condition references the object more than one times, we should not embed - // deserializer in it as the deserialization will happen many times and slow down the - // execution. - // TODO: we can still embed it if we can make sure subexpression elimination works here. - s - } else { - val newCondition = condition transform { - case a: Attribute if a == d.output.head => d.deserializer - } - val filter = Filter(newCondition, d.child) - - // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. - // We will remove it later in RemoveAliasOnlyProject rule. - val objAttrs = filter.output.zip(s.output).map { case (fout, sout) => - Alias(fout, fout.name)(exprId = sout.exprId) - } - Project(objAttrs, filter) - } - } - - def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = { - (lhs, rhs) match { - case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType - case _ => false - } - } -} - -/** * This rule rewrites predicate sub-queries into left semi/anti joins. The following predicates * are supported: * a. EXISTS/NOT EXISTS will be rewritten as semi/anti join, unresolved conditions in Filter http://git-wip-us.apache.org/repos/asf/spark/blob/f91614f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala new file mode 100644 index 0000000..8a25cee --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.types.StructType + +/* + * This file defines optimization rules related to object manipulation (for the Dataset API). + */ + + +/** + * Removes cases where we are unnecessarily going between the object and serialized (InternalRow) + * representation of data item. For example back to back map operations. + */ +object EliminateSerialization extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case d @ DeserializeToObject(_, _, s: SerializeFromObject) + if d.outputObjectType == s.inputObjectType => + // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. + // We will remove it later in RemoveAliasOnlyProject rule. + val objAttr = + Alias(s.child.output.head, s.child.output.head.name)(exprId = d.output.head.exprId) + Project(objAttr :: Nil, s.child) + case a @ AppendColumns(_, _, _, s: SerializeFromObject) + if a.deserializer.dataType == s.inputObjectType => + AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child) + } +} + + +/** + * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a + * [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed + * the deserializer in filter condition to save the extra serialization at last. + */ +object EmbedSerializerInFilter extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) + // SPARK-15632: Conceptually, filter operator should never introduce schema change. This + // optimization rule also relies on this assumption. However, Dataset typed filter operator + // does introduce schema changes in some cases. Thus, we only enable this optimization when + // + // 1. either input and output schemata are exactly the same, or + // 2. both input and output schemata are single-field schema and share the same type. + // + // The 2nd case is included because encoders for primitive types always have only a single + // field with hard-coded field name "value". + // TODO Cleans this up after fixing SPARK-15632. + if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) => + + val numObjects = condition.collect { + case a: Attribute if a == d.output.head => a + }.length + + if (numObjects > 1) { + // If the filter condition references the object more than one times, we should not embed + // deserializer in it as the deserialization will happen many times and slow down the + // execution. + // TODO: we can still embed it if we can make sure subexpression elimination works here. + s + } else { + val newCondition = condition transform { + case a: Attribute if a == d.output.head => d.deserializer + } + val filter = Filter(newCondition, d.child) + + // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. + // We will remove it later in RemoveAliasOnlyProject rule. + val objAttrs = filter.output.zip(s.output).map { case (fout, sout) => + Alias(fout, fout.name)(exprId = sout.exprId) + } + Project(objAttrs, filter) + } + } + + def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = { + (lhs, rhs) match { + case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType + case _ => false + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org