[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266275717 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.SchemaPruningTest +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.{StringType, StructType} + +class NestedColumnAliasingSuite extends SchemaPruningTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = Batch("Nested column pruning", FixedPoint(100), + ColumnPruning, + CollapseProject, + RemoveNoopOperators) :: Nil + } + + private val name = StructType.fromDDL("first string, middle string, last string") + private val employer = StructType.fromDDL("id int, company struct") + private val contact = LocalRelation( +'id.int, +'name.struct(name), +'address.string, +'friends.array(name), +'relatives.map(StringType, name), +'employer.struct(employer)) + + test("Pushing a single nested field projection") { +testSingleFieldPushDown((input: LogicalPlan) => input.limit(5)) +testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1)) +testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 11L, input)) + } + + test("Pushing multiple nested field projection") { +val first = GetStructField('name, 0, Some("first")) +val last = GetStructField('name, 2, Some("last")) + +val query = contact + .limit(5) + .select('id, first, last) + .analyze + +val optimized = Optimize.execute(query) + +val expected = contact + .select('id, first, last) + .limit(5) + .analyze + +comparePlans(optimized, expected) + } + + test("function with nested field inputs") { +val first = GetStructField('name, 0, Some("first")) +val last = GetStructField('name, 2, Some("last")) + +val query = contact + .limit(5) + .select('id, ConcatWs(Seq(first, last))) + .analyze + +val optimized = Optimize.execute(query) + +val aliases = collectGeneratedAliases(optimized) + +val expected = contact + .select('id, first.as(aliases(0)), last.as(aliases(1))) + .limit(5) + .select( +'id, +ConcatWs(Seq($"${aliases(0)}", $"${aliases(1)}")).as("concat_ws(name.first, name.last)")) + .analyze +comparePlans(optimized, expected) + } + + test("multi-level nested field") { +val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 0, Some("name")) +val field2 = GetStructField('employer, 0, Some("id")) + +val query = contact + .limit(5) + .select(field1, field2) + .analyze + +val optimized = Optimize.execute(query) + +val expected = contact + .select(field1, field2) + .limit(5) + .analyze +comparePlans(optimized, expected) + } + + test("Push original case-sensitive names") { +val first1 = GetStructField('name, 0, Some("first")) +val first2 = GetStructField('name, 1, Some("FIRST")) + +val query = contact + .limit(5) + .select('id, first1, first2) + .analyze + +val optimized = Optimize.execute(query) + +val expected = contact + .select('id, first1, first2) + .limit(5) + .analyze + +comparePlans(optimized, expected) + } + + test("Pushing a single nested field projection - negative") { +val ops = Array( + (input: LogicalPlan) => input.distribute('name)(1), + (input: LogicalPlan) => input.distribute($"name.middle")(1), + (input: LogicalPlan) => input.orderBy('name.asc), + (input: LogicalPlan) => input.orderBy($"name.middle".asc),
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266275500 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.SchemaPruningTest +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.{StringType, StructType} + +class NestedColumnAliasingSuite extends SchemaPruningTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = Batch("Nested column pruning", FixedPoint(100), + ColumnPruning, + CollapseProject, + RemoveNoopOperators) :: Nil + } + + private val name = StructType.fromDDL("first string, middle string, last string") + private val employer = StructType.fromDDL("id int, company struct") + private val contact = LocalRelation( +'id.int, +'name.struct(name), +'address.string, +'friends.array(name), +'relatives.map(StringType, name), +'employer.struct(employer)) + + test("Pushing a single nested field projection") { +testSingleFieldPushDown((input: LogicalPlan) => input.limit(5)) +testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1)) +testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 11L, input)) + } + + test("Pushing multiple nested field projection") { +val first = GetStructField('name, 0, Some("first")) +val last = GetStructField('name, 2, Some("last")) + +val query = contact + .limit(5) + .select('id, first, last) + .analyze + +val optimized = Optimize.execute(query) + +val expected = contact + .select('id, first, last) + .limit(5) + .analyze + +comparePlans(optimized, expected) + } + + test("function with nested field inputs") { +val first = GetStructField('name, 0, Some("first")) +val last = GetStructField('name, 2, Some("last")) + +val query = contact + .limit(5) + .select('id, ConcatWs(Seq(first, last))) + .analyze + +val optimized = Optimize.execute(query) + +val aliases = collectGeneratedAliases(optimized) + +val expected = contact + .select('id, first.as(aliases(0)), last.as(aliases(1))) + .limit(5) + .select( +'id, +ConcatWs(Seq($"${aliases(0)}", $"${aliases(1)}")).as("concat_ws(name.first, name.last)")) + .analyze +comparePlans(optimized, expected) + } + + test("multi-level nested field") { +val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 0, Some("name")) +val field2 = GetStructField('employer, 0, Some("id")) + +val query = contact + .limit(5) + .select(field1, field2) + .analyze + +val optimized = Optimize.execute(query) + +val expected = contact + .select(field1, field2) + .limit(5) + .analyze +comparePlans(optimized, expected) + } + + test("Push original case-sensitive names") { +val first1 = GetStructField('name, 0, Some("first")) +val first2 = GetStructField('name, 1, Some("FIRST")) + +val query = contact + .limit(5) + .select('id, first1, first2) + .analyze + +val optimized = Optimize.execute(query) + +val expected = contact + .select('id, first1, first2) + .limit(5) + .analyze + +comparePlans(optimized, expected) + } + + test("Pushing a single nested field projection - negative") { +val ops = Array( + (input: LogicalPlan) => input.distribute('name)(1), + (input: LogicalPlan) => input.distribute($"name.middle")(1), + (input: LogicalPlan) => input.orderBy('name.asc), + (input: LogicalPlan) => input.orderBy($"name.middle".asc),
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266275407 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) +if SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) => Review comment: How about pre-checking if `projectList` has at least one`GetStructField` here before computing `getAliasSubMap`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266275085 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) +if SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. Review comment: nit: childen -> children This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266275032 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) +if SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) => + getAliasSubMap(plan, child) Review comment: I modified abit for refactoring and could you check? https://github.com/dongjoon-hyun/spark/pull/4 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266160056 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ## @@ -647,6 +647,10 @@ object ColumnPruning extends Rule[LogicalPlan] { // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p +case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) +if SQLConf.get.nestedSchemaPruningEnabled => Review comment: We need this check? It seems we've already checked this flag in https://github.com/apache/spark/pull/23964/files#diff-43334bab9616cc53e8797b9afa9fc7aaR36? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r266159528 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Return root references that are individually accessed as a whole, and `GetStructField`s. + */ + private def collectRootReferenceAndGetStructField(plan: LogicalPlan): Seq[Expression] = { +def helper(e: Expression): Seq[Expression] = e match { + case _: AttributeReference | _: GetStructField => Seq(e) + case es if es.children.nonEmpty => es.children.flatMap(helper) + case _ => Seq.empty +} +plan.expressions.flatMap(helper) + } + + /** + * Return two maps in order to replace nested fields to aliases. + * + * 1. GetStructField -> Alias: A new alias is created for each nested field. + * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it. + */ + private def getAliasSubMap(plans: LogicalPlan*) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = { +val (nestedFieldReferences, otherRootReferences) = plans + .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition { +case _: GetStructField => true +case _ => false + } + +val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]] Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r265849322 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Return root references that are individually accessed as a whole, and `GetStructField`s. + */ + private def collectRootReferenceAndGetStructField(plan: LogicalPlan): Seq[Expression] = { +def helper(e: Expression): Seq[Expression] = e match { Review comment: super nit: How about `doCollectFunc`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r265850964 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Return root references that are individually accessed as a whole, and `GetStructField`s. + */ + private def collectRootReferenceAndGetStructField(plan: LogicalPlan): Seq[Expression] = { +def helper(e: Expression): Seq[Expression] = e match { + case _: AttributeReference | _: GetStructField => Seq(e) + case es if es.children.nonEmpty => es.children.flatMap(helper) + case _ => Seq.empty +} +plan.expressions.flatMap(helper) + } + + /** + * Return two maps in order to replace nested fields to aliases. + * + * 1. GetStructField -> Alias: A new alias is created for each nested field. + * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it. + */ + private def getAliasSubMap(plans: LogicalPlan*) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = { +val (nestedFieldReferences, otherRootReferences) = plans + .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition { +case _: GetStructField => true +case _ => false + } + +val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]] Review comment: nit: Drop `.asInstanceOf[Seq[GetStructField]]`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r265850994 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Return root references that are individually accessed as a whole, and `GetStructField`s. + */ + private def collectRootReferenceAndGetStructField(plan: LogicalPlan): Seq[Expression] = { +def helper(e: Expression): Seq[Expression] = e match { + case _: AttributeReference | _: GetStructField => Seq(e) + case es if es.children.nonEmpty => es.children.flatMap(helper) + case _ => Seq.empty +} +plan.expressions.flatMap(helper) + } + + /** + * Return two maps in order to replace nested fields to aliases. + * + * 1. GetStructField -> Alias: A new alias is created for each nested field. + * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it. + */ + private def getAliasSubMap(plans: LogicalPlan*) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = { +val (nestedFieldReferences, otherRootReferences) = plans + .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition { +case _: GetStructField => true +case _ => false + } + +val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]] + .filter(!_.references.subsetOf(AttributeSet(otherRootReferences))) + .groupBy(_.references.head) + .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) => Review comment: nit: `.flatMap { case (attr, nestedFields: Seq[GetStructField]) =>`
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r265848830 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ## @@ -647,6 +647,10 @@ object ColumnPruning extends Rule[LogicalPlan] { // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p +case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) Review comment: We don't need to compute `getAliasSubMap` in `NestedColumnAliasing` if `nestedSchemaPruningEnabled` is false, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r265849055 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute them by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Return root references that are individually accessed as a whole, and `GetStructField`s. + */ + private def collectRootReferenceAndGetStructField(plan: LogicalPlan): Seq[Expression] = { +def helper(e: Expression): Seq[Expression] = e match { + case _: AttributeReference | _: GetStructField => Seq(e) + case es if es.children.nonEmpty => es.children.flatMap(helper) + case _ => Seq.empty +} +plan.expressions.flatMap(helper) + } + + /** + * Return two maps in order to replace nested fields to aliases. + * + * 1. GetStructField -> Alias: A new alias is created for each nested field. + * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it. + */ + private def getAliasSubMap(plans: LogicalPlan*) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = { +val (nestedFieldReferences, otherRootReferences) = plans + .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition { +case _: GetStructField => true +case _ => false + } + +val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]] + .filter(!_.references.subsetOf(AttributeSet(otherRootReferences))) + .groupBy(_.references.head) + .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) => +// Each expression can contain multiple nested fields. +// Note that we keep the original
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263634734 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) => Review comment: Yea, I know the great metric against the additional cost (unnecessary aliases)! So, I just wanted to know which one we should cover in this pr, and which one we could resolve in feature work. If we have TODO tasks, IMO it would be better to leave TODO comments where possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263633113 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases))) + +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true Review comment: Ah, what I'm worried about is that no test fails with/without this entry: `case _: LocalLimit => true`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263275860 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases))) + +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true Review comment: Can you add a test case for a plan having `LocalLimit` w/o `GlobalLimit`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263274712 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) => Review comment: We need this match? IIUC, if this match doesn't exist, this rule generates unnecessary aliases between local/global limits. We cannot remove these unnecessary aliases in another optimizer rule, e.g., RemoveNoopOperators? I think this rule could generate unnecessary aliases in other cases, too, e.g., ``` scala> spark.table("t").printSchema root |-- c0: long (nullable = true) |-- c1: struct (nullable = true) ||-- n0: long (nullable = true) ||-- n1: long (nullable = true) scala> spark.table("t").sample(0.5).repartition(1).select("c1.n0").explain(true) == Optimized Logical Plan == Project [_gen_alias_36#36L AS n0#35L] +- Repartition 1, true +- Project [_gen_alias_37#37L AS _gen_alias_36#36L] +- Sample 0.0, 0.5, false, 5378592115661057580 +- Project [c1#25.n0 AS _gen_alias_37#37L] +- Relation[c1#25] parquet ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263259691 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases))) + +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Similar to [[QueryPlan.references]], but this only returns all attributes + * that are explicitly referenced on the root levels in a [[LogicalPlan]]. + */ + private def getRootReferences(plan: LogicalPlan): AttributeSet = { +def helper(e: Expression): AttributeSet = e match { + case attr: AttributeReference => AttributeSet(attr) + case _: GetStructField => AttributeSet.empty + case es if es.children.nonEmpty => AttributeSet(es.children.flatMap(helper)) + case _ => AttributeSet.empty +} +AttributeSet.fromAttributeSets(plan.expressions.map(helper)) + } + + /** + * Returns all the nested fields that are explicitly referenced as [[Expression]] + * in a [[LogicalPlan]]. Currently, we only support having [[GetStructField]] in the chain + * of the expressions. If the chain contains GetArrayStructFields, GetMapValue, or + * GetArrayItem, the nested field substitution will not be performed. + */ + private def getNestedFieldReferences(plan: LogicalPlan): Seq[GetStructField] = { +def helper(e: Expression): Seq[GetStructField] = e match { +
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263257592 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project Review comment: typo: `substitute the` -> `substitute them` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263257374 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. Review comment: super nit: " * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263257374 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. Review comment: super nit: ` * This aims to handle a nested column aliasing pattern inside the ColumnPruning optimizer rule.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263257374 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. Review comment: super nit: ` * This aims to handle a nested column aliasing pattern inside the `ColumnPruning` optimizer rule.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition URL: https://github.com/apache/spark/pull/23964#discussion_r263257086 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ + +/** + * This aims to handle nested column aliasing pattern inside `ColumnPruning` optimizer rule. + * If a project or its child references to nested fields, and not all the fields + * in a nested attribute are used, we can substitute the by alias attributes; then a project + * of the nested fields as aliases on the children of the child will be created. + */ +object NestedColumnAliasing { + + def unapply(plan: LogicalPlan) +: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan match { +case Project(_, child) if canProjectPushThrough(child) => + getAliasSubMap(plan, child) +case _ => None + } + + /** + * Replace nested columns to prune unused nested columns later. + */ + def replaceToAliases( + plan: LogicalPlan, + nestedFieldToAlias: Map[GetStructField, Alias], + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match { +case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases))) + +case Project(projectList, child) => + Project( +getNewProjectList(projectList, nestedFieldToAlias), +replaceChildrenWithAliases(child, attrToAliases)) + } + + /** + * Return a replaced project list. + */ + private def getNewProjectList( + projectList: Seq[NamedExpression], + nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = { +projectList.map(_.transform { + case f: GetStructField if nestedFieldToAlias.contains(f) => +nestedFieldToAlias(f).toAttribute +}.asInstanceOf[NamedExpression]) + } + + /** + * Return a plan with new childen replaced with aliases. + */ + private def replaceChildrenWithAliases( + plan: LogicalPlan, + attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { +plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, Seq(a))), plan) +}) + } + + /** + * Returns true for those operators that project can be pushed through. + */ + private def canProjectPushThrough(plan: LogicalPlan) = plan match { +case _: GlobalLimit => true +case _: LocalLimit => true +case _: Repartition => true +case _: Sample => true +case _ => false + } + + /** + * Similar to [[QueryPlan.references]], but this only returns all attributes + * that are explicitly referenced on the root levels in a [[LogicalPlan]]. + */ + private def getRootReferences(plan: LogicalPlan): AttributeSet = { +def helper(e: Expression): AttributeSet = e match { + case attr: AttributeReference => AttributeSet(attr) + case _: GetStructField => AttributeSet.empty + case es if es.children.nonEmpty => AttributeSet(es.children.flatMap(helper)) + case _ => AttributeSet.empty +} +AttributeSet.fromAttributeSets(plan.expressions.map(helper)) + } + + /** + * Returns all the nested fields that are explicitly referenced as [[Expression]] + * in a [[LogicalPlan]]. Currently, we only support having [[GetStructField]] in the chain + * of the expressions. If the chain contains GetArrayStructFields, GetMapValue, or + * GetArrayItem, the nested field substitution will not be performed. + */ + private def getNestedFieldReferences(plan: LogicalPlan): Seq[GetStructField] = { +def helper(e: Expression): Seq[GetStructField] = e match { +