[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266275717
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches = Batch("Nested column pruning", FixedPoint(100),
+  ColumnPruning,
+  CollapseProject,
+  RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct")
+  private val contact = LocalRelation(
+'id.int,
+'name.struct(name),
+'address.string,
+'friends.array(name),
+'relatives.map(StringType, name),
+'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 
11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, first, last)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first, last)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, ConcatWs(Seq(first, last)))
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val aliases = collectGeneratedAliases(optimized)
+
+val expected = contact
+  .select('id, first.as(aliases(0)), last.as(aliases(1)))
+  .limit(5)
+  .select(
+'id,
+ConcatWs(Seq($"${aliases(0)}", 
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+val field2 = GetStructField('employer, 0, Some("id"))
+
+val query = contact
+  .limit(5)
+  .select(field1, field2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select(field1, field2)
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+val first1 = GetStructField('name, 0, Some("first"))
+val first2 = GetStructField('name, 1, Some("FIRST"))
+
+val query = contact
+  .limit(5)
+  .select('id, first1, first2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first1, first2)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+val ops = Array(
+  (input: LogicalPlan) => input.distribute('name)(1),
+  (input: LogicalPlan) => input.distribute($"name.middle")(1),
+  (input: LogicalPlan) => input.orderBy('name.asc),
+  (input: LogicalPlan) => input.orderBy($"name.middle".asc),

[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266275500
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches = Batch("Nested column pruning", FixedPoint(100),
+  ColumnPruning,
+  CollapseProject,
+  RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct")
+  private val contact = LocalRelation(
+'id.int,
+'name.struct(name),
+'address.string,
+'friends.array(name),
+'relatives.map(StringType, name),
+'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 
11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, first, last)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first, last)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, ConcatWs(Seq(first, last)))
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val aliases = collectGeneratedAliases(optimized)
+
+val expected = contact
+  .select('id, first.as(aliases(0)), last.as(aliases(1)))
+  .limit(5)
+  .select(
+'id,
+ConcatWs(Seq($"${aliases(0)}", 
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+val field2 = GetStructField('employer, 0, Some("id"))
+
+val query = contact
+  .limit(5)
+  .select(field1, field2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select(field1, field2)
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+val first1 = GetStructField('name, 0, Some("first"))
+val first2 = GetStructField('name, 1, Some("FIRST"))
+
+val query = contact
+  .limit(5)
+  .select('id, first1, first2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first1, first2)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+val ops = Array(
+  (input: LogicalPlan) => input.distribute('name)(1),
+  (input: LogicalPlan) => input.distribute($"name.middle")(1),
+  (input: LogicalPlan) => input.orderBy('name.asc),
+  (input: LogicalPlan) => input.orderBy($"name.middle".asc),

[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266275407
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   How about pre-checking if `projectList` has at least one`GetStructField` 
here before computing `getAliasSubMap`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266275085
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
 
 Review comment:
   nit: childen -> children


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266275032
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
 
 Review comment:
   I modified abit for refactoring and could you check? 
https://github.com/dongjoon-hyun/spark/pull/4


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266160056
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ##
 @@ -647,6 +647,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
 // Can't prune the columns on LeafNode
 case p @ Project(_, _: LeafNode) => p
 
+case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases)
+if SQLConf.get.nestedSchemaPruningEnabled =>
 
 Review comment:
   We need this check? It seems we've already checked this flag in 
https://github.com/apache/spark/pull/23964/files#diff-43334bab9616cc53e8797b9afa9fc7aaR36?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266159528
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please 

[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265849322
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
 
 Review comment:
   super nit: How about `doCollectFunc`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265850964
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   nit: Drop `.asInstanceOf[Seq[GetStructField]]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 

[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265850994
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+  .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+  .groupBy(_.references.head)
+  .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) =>
 
 Review comment:
   nit: `.flatMap { case (attr, nestedFields: Seq[GetStructField]) =>`


[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265848830
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ##
 @@ -647,6 +647,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
 // Can't prune the columns on LeafNode
 case p @ Project(_, _: LeafNode) => p
 
+case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases)
 
 Review comment:
   We don't need to compute `getAliasSubMap` in `NestedColumnAliasing` if 
`nestedSchemaPruningEnabled` is false, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265849055
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+  .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+  .groupBy(_.references.head)
+  .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) =>
+// Each expression can contain multiple nested fields.
+// Note that we keep the original 

[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263634734
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
 
 Review comment:
   Yea, I know the great metric against the additional cost (unnecessary 
aliases)! So, I just wanted to know which one we should cover in this pr, and 
which one we could resolve in feature work. If we have TODO tasks, IMO it would 
be better to leave TODO comments where possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263633113
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases)))
+
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
 
 Review comment:
   Ah, what I'm worried about is that no test fails with/without this entry: 
`case _: LocalLimit => true`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263275860
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases)))
+
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
 
 Review comment:
   Can you add a test case for a plan having `LocalLimit` w/o `GlobalLimit`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263274712
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
 
 Review comment:
   We need this match? IIUC, if this match doesn't exist, this rule generates 
unnecessary aliases between local/global limits. We cannot remove these 
unnecessary aliases in another optimizer rule, e.g., RemoveNoopOperators?
   
   I think this rule could generate unnecessary aliases in other cases, too, 
e.g.,
   ```
   
   scala> spark.table("t").printSchema
   root
|-- c0: long (nullable = true)
|-- c1: struct (nullable = true)
||-- n0: long (nullable = true)
||-- n1: long (nullable = true)
   
   scala> 
spark.table("t").sample(0.5).repartition(1).select("c1.n0").explain(true)
   == Optimized Logical Plan ==
   Project [_gen_alias_36#36L AS n0#35L]
   +- Repartition 1, true
  +- Project [_gen_alias_37#37L AS _gen_alias_36#36L]
 +- Sample 0.0, 0.5, false, 5378592115661057580
+- Project [c1#25.n0 AS _gen_alias_37#37L]
   +- Relation[c1#25] parquet
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-06 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263259691
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases)))
+
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Similar to [[QueryPlan.references]], but this only returns all attributes
+   * that are explicitly referenced on the root levels in a [[LogicalPlan]].
+   */
+  private def getRootReferences(plan: LogicalPlan): AttributeSet = {
+def helper(e: Expression): AttributeSet = e match {
+  case attr: AttributeReference => AttributeSet(attr)
+  case _: GetStructField => AttributeSet.empty
+  case es if es.children.nonEmpty => 
AttributeSet(es.children.flatMap(helper))
+  case _ => AttributeSet.empty
+}
+AttributeSet.fromAttributeSets(plan.expressions.map(helper))
+  }
+
+  /**
+   * Returns all the nested fields that are explicitly referenced as 
[[Expression]]
+   * in a [[LogicalPlan]]. Currently, we only support having 
[[GetStructField]] in the chain
+   * of the expressions. If the chain contains GetArrayStructFields, 
GetMapValue, or
+   * GetArrayItem, the nested field substitution will not be performed.
+   */
+  private def getNestedFieldReferences(plan: LogicalPlan): Seq[GetStructField] 
= {
+def helper(e: Expression): Seq[GetStructField] = e match {
+  

[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-06 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263257592
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
 
 Review comment:
   typo: `substitute the` -> `substitute them`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-06 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263257374
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
 
 Review comment:
   super nit: " * This aims to handle a nested column aliasing pattern inside 
the `ColumnPruning` optimizer rule."


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-06 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263257374
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
 
 Review comment:
   super nit: ` * This aims to handle a nested column aliasing pattern inside 
the ColumnPruning optimizer rule.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-06 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263257374
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
 
 Review comment:
   super nit: ` * This aims to handle a nested column aliasing pattern inside 
the `ColumnPruning` optimizer rule.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-06 Thread GitBox
maropu commented on a change in pull request #23964: [SPARK-26975][SQL] Support 
nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263257086
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases)))
+
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Similar to [[QueryPlan.references]], but this only returns all attributes
+   * that are explicitly referenced on the root levels in a [[LogicalPlan]].
+   */
+  private def getRootReferences(plan: LogicalPlan): AttributeSet = {
+def helper(e: Expression): AttributeSet = e match {
+  case attr: AttributeReference => AttributeSet(attr)
+  case _: GetStructField => AttributeSet.empty
+  case es if es.children.nonEmpty => 
AttributeSet(es.children.flatMap(helper))
+  case _ => AttributeSet.empty
+}
+AttributeSet.fromAttributeSets(plan.expressions.map(helper))
+  }
+
+  /**
+   * Returns all the nested fields that are explicitly referenced as 
[[Expression]]
+   * in a [[LogicalPlan]]. Currently, we only support having 
[[GetStructField]] in the chain
+   * of the expressions. If the chain contains GetArrayStructFields, 
GetMapValue, or
+   * GetArrayItem, the nested field substitution will not be performed.
+   */
+  private def getNestedFieldReferences(plan: LogicalPlan): Seq[GetStructField] 
= {
+def helper(e: Expression): Seq[GetStructField] = e match {
+