[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-04-01 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r271109162
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(projectList, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   Hi, @HyukjinKwon . Actually, this is the 3rd try for purpose. At every PR, 
there exists the same question about `a separate rule`. :)
   
   1. 1st PR: https://github.com/apache/spark/pull/23542#discussion_r248095908
   2. 2nd PR: https://github.com/apache/spark/pull/23873#discussion_r261768034
   3. 3rd one: Yours.
   
   In short, this should be here. Otherwise, the projection is already moved 
down as the whole attribute instead of a subset of fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-04-01 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r271109162
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(projectList, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   Hi, @HyukjinKwon . Actually, this is the 3rd try for this. At every PR, 
there exists the same questions about `a separate rule`. :)
   
   1. 1st PR: https://github.com/apache/spark/pull/23542#discussion_r248095908
   2. 2nd PR: https://github.com/apache/spark/pull/23873#discussion_r261768034
   3. 3rd one: Your.
   
   In short, this should be here. Otherwise, the projection is already moved 
down as the whole attribute instead of a subset of fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-19 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266954719
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(projectList, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   Yep. I don't think that block this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-18 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266561086
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(projectList, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   Thank you for reivew, @viirya !
   1. Do you mean JSON and AVRO?
   > For example, when reading from data sources don't support nested schema 
pruning.
   2. For the following,
   > Shall we apply it when pruning nested fields happens actually?
   
   This line is `unapply` and this is used [at the following 
pattern](https://github.com/apache/spark/pull/23964/files#diff-a636a87d8843eeccca90140be91d4fafR650)
 inside `ColumnPruning` optimizer rule and 
`NestedColumnAliasing.replaceToAliases` is not called if `getAliasSubMap` 
return `None`.
   ```scala
   case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) =>
  NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias, 
attrToAliases)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-18 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266561086
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(projectList, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   Thank you for reivew, @viirya !
   1. Do you mean JSON and AVRO?
   > For example, when reading from data sources don't support nested schema 
pruning.
   2. For the following,
   > Shall we apply it when pruning nested fields happens actually?
   
   This line is `unapply` and this is used [at the following 
pattern](https://github.com/apache/spark/pull/23964/files#diff-a636a87d8843eeccca90140be91d4fafR650)
 inside `ColumnPruning` optimizer rule and 
`NestedColumnAliasing.replaceToAliases` is not called if `getAliasSubMap`
   ```scala
   case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) =>
  NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias, 
attrToAliases)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266296640
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches = Batch("Nested column pruning", FixedPoint(100),
+  ColumnPruning,
+  CollapseProject,
+  RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct")
+  private val contact = LocalRelation(
+'id.int,
+'name.struct(name),
+'address.string,
+'friends.array(name),
+'relatives.map(StringType, name),
+'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 
11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, first, last)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first, last)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, ConcatWs(Seq(first, last)))
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val aliases = collectGeneratedAliases(optimized)
+
+val expected = contact
+  .select('id, first.as(aliases(0)), last.as(aliases(1)))
+  .limit(5)
+  .select(
+'id,
+ConcatWs(Seq($"${aliases(0)}", 
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+val field2 = GetStructField('employer, 0, Some("id"))
+
+val query = contact
+  .limit(5)
+  .select(field1, field2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select(field1, field2)
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+val first1 = GetStructField('name, 0, Some("first"))
+val first2 = GetStructField('name, 1, Some("FIRST"))
+
+val query = contact
+  .limit(5)
+  .select('id, first1, first2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first1, first2)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+val ops = Array(
+  (input: LogicalPlan) => input.distribute('name)(1),
+  (input: LogicalPlan) => input.distribute($"name.middle")(1),
+  (input: LogicalPlan) => input.orderBy('name.asc),
+  (input: LogicalPlan) => 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266296640
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches = Batch("Nested column pruning", FixedPoint(100),
+  ColumnPruning,
+  CollapseProject,
+  RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct")
+  private val contact = LocalRelation(
+'id.int,
+'name.struct(name),
+'address.string,
+'friends.array(name),
+'relatives.map(StringType, name),
+'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 
11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, first, last)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first, last)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, ConcatWs(Seq(first, last)))
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val aliases = collectGeneratedAliases(optimized)
+
+val expected = contact
+  .select('id, first.as(aliases(0)), last.as(aliases(1)))
+  .limit(5)
+  .select(
+'id,
+ConcatWs(Seq($"${aliases(0)}", 
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+val field2 = GetStructField('employer, 0, Some("id"))
+
+val query = contact
+  .limit(5)
+  .select(field1, field2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select(field1, field2)
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+val first1 = GetStructField('name, 0, Some("first"))
+val first2 = GetStructField('name, 1, Some("FIRST"))
+
+val query = contact
+  .limit(5)
+  .select('id, first1, first2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first1, first2)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+val ops = Array(
+  (input: LogicalPlan) => input.distribute('name)(1),
+  (input: LogicalPlan) => input.distribute($"name.middle")(1),
+  (input: LogicalPlan) => input.orderBy('name.asc),
+  (input: LogicalPlan) => 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266295808
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches = Batch("Nested column pruning", FixedPoint(100),
+  ColumnPruning,
+  CollapseProject,
+  RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct")
+  private val contact = LocalRelation(
+'id.int,
+'name.struct(name),
+'address.string,
+'friends.array(name),
+'relatives.map(StringType, name),
+'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 
11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, first, last)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first, last)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, ConcatWs(Seq(first, last)))
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val aliases = collectGeneratedAliases(optimized)
+
+val expected = contact
+  .select('id, first.as(aliases(0)), last.as(aliases(1)))
+  .limit(5)
+  .select(
+'id,
+ConcatWs(Seq($"${aliases(0)}", 
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+val field2 = GetStructField('employer, 0, Some("id"))
+
+val query = contact
+  .limit(5)
+  .select(field1, field2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select(field1, field2)
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+val first1 = GetStructField('name, 0, Some("first"))
+val first2 = GetStructField('name, 1, Some("FIRST"))
+
+val query = contact
+  .limit(5)
+  .select('id, first1, first2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first1, first2)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+val ops = Array(
+  (input: LogicalPlan) => input.distribute('name)(1),
+  (input: LogicalPlan) => input.distribute($"name.middle")(1),
+  (input: LogicalPlan) => input.orderBy('name.asc),
+  (input: LogicalPlan) => 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266295503
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
 
 Review comment:
   The complexity will be the same since `getAliasSubMap` do `partition` at the 
first line and the rest of logic will not executed. We need 
`collectRootReferenceAndGetStructField` here, too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-17 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266294562
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child)
+if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
 
 Review comment:
   That looks better to me. I merged it. Thank you so much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266164061
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ##
 @@ -647,6 +647,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
 // Can't prune the columns on LeafNode
 case p @ Project(_, _: LeafNode) => p
 
+case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases)
+if SQLConf.get.nestedSchemaPruningEnabled =>
 
 Review comment:
   Thanks. Yes. Since it's moved to there recently, we can remove this here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266160003
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266104512
 
 

 ##
 File path: sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
 ##
 @@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column145174 
 23  6.9 145.1   1.0X
-Nested column   325346 
 19  3.1 324.8   0.4X
+Top-level column128166 
 24  7.8 128.0   1.0X
+Nested column   308325 
 10  3.2 308.3   0.4X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column434508 
108  2.3 434.3   1.0X
-Nested column   625647 
 23  1.6 624.8   0.7X
+Top-level column447496 
 91  2.2 447.0   1.0X
+Nested column   631666 
 40  1.6 631.2   0.7X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column357368 
  9  2.8 356.9   1.0X
-Nested column  2897   2976 
 88  0.32897.4   0.1X
+Top-level column360394 
 84  2.8 360.0   1.0X
+Nested column   553586 
 65  1.8 553.5   0.7X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning by exprs:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column365413 
 77  2.7 364.9   1.0X
-Nested column  2902   2969 
 99  0.32902.4   0.1X
+Top-level column368393 
 50  2.7 368.3   1.0X
+Nested column  2942   3017 
 82  0.32942.0   0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Top-level column124143 
 10  8.1 124.1   1.0X
+Nested column   345366 
 34  2.9 344.8   0.4X
 
 Review comment:
   I added a new benchmark case for `Sample`. This is the result after 
improvement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266104232
 
 

 ##
 File path: sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
 ##
 @@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column145174 
 23  6.9 145.1   1.0X
-Nested column   325346 
 19  3.1 324.8   0.4X
+Top-level column128166 
 24  7.8 128.0   1.0X
+Nested column   308325 
 10  3.2 308.3   0.4X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column434508 
108  2.3 434.3   1.0X
-Nested column   625647 
 23  1.6 624.8   0.7X
+Top-level column447496 
 91  2.2 447.0   1.0X
+Nested column   631666 
 40  1.6 631.2   0.7X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column357368 
  9  2.8 356.9   1.0X
-Nested column  2897   2976 
 88  0.32897.4   0.1X
+Top-level column360394 
 84  2.8 360.0   1.0X
+Nested column   553586 
 65  1.8 553.5   0.7X
 
 Review comment:
   This become faster; `2976s` -> `586s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266103957
 
 

 ##
 File path: sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
 ##
 @@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column120148 
 24  8.3 120.0   1.0X
-Nested column  2367   2415 
 43  0.42367.0   0.1X
+Top-level column135169 
 19  7.4 134.7   1.0X
+Nested column  2131   2216 
 95  0.52131.4   0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column129153 
 16  7.8 128.5   1.0X
-Nested column  2368   2400 
 32  0.42367.7   0.1X
+Top-level column147158 
 10  6.8 146.9   1.0X
+Nested column  2149   2204 
 50  0.52148.9   0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column359396 
 59  2.8 358.9   1.0X
-Nested column  4100   4147 
 59  0.24099.9   0.1X
+Top-level column386399 
 16  2.6 385.8   1.0X
+Nested column  2612   2666 
 57  0.42612.2   0.1X
 
 Review comment:
   Since this PR is for all data sources, this `ORC v2` also become faster; 
`4147s` -> `2666s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266103343
 
 

 ##
 File path: sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
 ##
 @@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column117154 
 23  8.5 117.5   1.0X
-Nested column  1271   1295 
 26  0.81270.5   0.1X
+Top-level column131150 
 25  7.7 130.6   1.0X
+Nested column   922954 
 21  1.1 922.2   0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column431488 
 73  2.3 431.2   1.0X
-Nested column  1738   1777 
 24  0.61738.3   0.2X
+Top-level column446477 
 50  2.2 445.5   1.0X
+Nested column  1328   1366 
 44  0.81328.4   0.3X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column349381 
 87  2.9 348.7   1.0X
-Nested column  4374   4456 
125  0.24373.6   0.1X
+Top-level column357386 
 33  2.8 356.8   1.0X
+Nested column  1266   1274 
  7  0.81266.3   0.3X
 
 Review comment:
   This becomes 3 times faster.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266049288
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   Oops. I forgot that I added this for Scala issue. We need this here for the 
below `flatMap`.
   ```
   [error] [warn] 
/Users/dongjoon/PRS/SPARK-26975-ALIAS/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala:117:
 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266049288
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   Oops. I forgot that I added this for Scala issue. We need this here for the 
below `flatMap`.
   ```
   [error] [warn] 
/.../sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala:117:
 non-variable type argument 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266049288
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   Oops. I forgot that I added that for Scala issue. We need this here for the 
below `flatMap`.
   ```
   [error] [warn] 
/Users/dongjoon/PRS/SPARK-26975-ALIAS/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala:117:
 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266046929
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+  .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+  .groupBy(_.references.head)
+  .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) =>
 
 Review comment:
   Sure.


This is an 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266046563
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
 
 Review comment:
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266046156
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
 
 Review comment:
   No problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266045687
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+  .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+  .groupBy(_.references.head)
+  .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) =>
+// Each expression can contain multiple nested fields.
+// Note that we keep the 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-15 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r266042807
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ##
 @@ -647,6 +647,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
 // Can't prune the columns on LeafNode
 case p @ Project(_, _: LeafNode) => p
 
+case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases)
 
 Review comment:
   Of course! Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265707312
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references and `GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(plan: LogicalPlan): 
Seq[Expression] = {
+def helper(e: Expression): Seq[Expression] = e match {
+  case _: AttributeReference | _: GetStructField => Seq(e)
+  case es if es.children.nonEmpty => es.children.flatMap(helper)
+  case _ => Seq.empty
+}
+plan.expressions.flatMap(helper)
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(plans: LogicalPlan*)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+val (nestedFieldReferences, otherRootReferences) = plans
+  .map(collectRootReferenceAndGetStructField).reduce(_ ++ _).partition {
+case _: GetStructField => true
+case _ => false
+  }
+
+val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+  .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+  .groupBy(_.references.head)
+  .flatMap { case (attr: Attribute, nestedFields: Seq[GetStructField]) =>
+// Each expression can contain multiple nested fields.
+// Note that we keep the original names to deliver to parquet in a 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265658832
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 ##
 @@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches = Batch("Nested column pruning", FixedPoint(100),
+  ColumnPruning,
+  CollapseProject,
+  RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct")
+  private val contact = LocalRelation(
+'id.int,
+'name.struct(name),
+'address.string,
+'friends.array(name),
+'relatives.map(StringType, name),
+'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+testSingleFieldPushDown("limit", (input: LogicalPlan) => input.limit(5))
+testSingleFieldPushDown("repartition", (input: LogicalPlan) => 
input.repartition(1))
+testSingleFieldPushDown("sample", (input: LogicalPlan) => Sample(0.0, 0.6, 
false, 11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, first, last)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first, last)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+val first = GetStructField('name, 0, Some("first"))
+val last = GetStructField('name, 2, Some("last"))
+
+val query = contact
+  .limit(5)
+  .select('id, ConcatWs(Seq(first, last)))
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, ConcatWs(Seq(first, last)))
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+val field2 = GetStructField('employer, 0, Some("id"))
+
+val query = contact
+  .limit(5)
+  .select(field1, field2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select(field1, field2)
+  .limit(5)
+  .analyze
+comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+val first1 = GetStructField('name, 0, Some("first"))
+val first2 = GetStructField('name, 1, Some("FIRST"))
+
+val query = contact
+  .limit(5)
+  .select('id, first1, first2)
+  .analyze
+
+val optimized = Optimize.execute(query)
+
+val expected = contact
+  .select('id, first1, first2)
+  .limit(5)
+  .analyze
+
+comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+val ops = Array(
+  (input: LogicalPlan) => input.distribute('name)(1),
+  (input: LogicalPlan) => input.distribute($"name.middle")(1),
+  (input: LogicalPlan) => input.orderBy('name.asc),
+  (input: LogicalPlan) => input.orderBy($"name.middle".asc),
+  (input: LogicalPlan) => input.sortBy('name.asc),
+  (input: LogicalPlan) => input.sortBy($"name.middle".asc),
+  (input: LogicalPlan) => 

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265657125
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
 
 Review comment:
   Also,, `CollapseProject` improvement is merged first for that reason. That 
prevents aliases at every steps.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-14 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r265656346
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
+case _: Repartition => true
+case _: Sample => true
+case _ => false
+  }
+
+  /**
+   * Return root references and `GetStructField`s.
 
 Review comment:
   Sure!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-10 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r264098307
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
 
 Review comment:
   @maropu . Since that is a general issue, I created 
[SPARK-27123](https://issues.apache.org/jira/browse/SPARK-27123) and make a 
[PR](https://github.com/apache/spark/pull/24049) for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-08 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263884012
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
 
 Review comment:
   Sure. Thanks. Noop Alias issue is a general one because users can make it, 
too. I'll try to file a general JIRA issue for removing Noop Alias.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263617985
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
 
 Review comment:
   Yes. Right. Current, I didn't add additional optimizer rule to handle that 
redundant one. However, the reduced size of data will overwhelm the additional 
simple projection.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263622325
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases)))
+
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
 
 Review comment:
   I'll add one negative case with Union~


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263620410
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+g.copy(child = replaceChildrenWithAliases(grandChild, attrToAliases)))
+
+case Project(projectList, child) =>
+  Project(
+getNewProjectList(projectList, nestedFieldToAlias),
+replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+  projectList: Seq[NamedExpression],
+  nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+projectList.map(_.transform {
+  case f: GetStructField if nestedFieldToAlias.contains(f) =>
+nestedFieldToAlias(f).toAttribute
+}.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new childen replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+  plan: LogicalPlan,
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+plan.withNewChildren(plan.children.map { plan =>
+  Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+})
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+case _: GlobalLimit => true
+case _: LocalLimit => true
 
 Review comment:
   Since this PR doesn't support `Union` and `Join`, `LocalLimit` w/o 
`GlobalLimit` is not considered. Do you mean some negative cases with `Union` 
and `Join`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-07 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r263617985
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle nested column aliasing pattern inside `ColumnPruning` 
optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute the by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+: Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+case Project(_, child) if canProjectPushThrough(child) =>
+  getAliasSubMap(plan, child)
+case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+  plan: LogicalPlan,
+  nestedFieldToAlias: Map[GetStructField, Alias],
+  attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+case Project(projectList, g @ GlobalLimit(_, grandChild: LocalLimit)) =>
 
 Review comment:
   Yes. Right. Current, I didn't add additional optimizer rule to handle that 
redundant one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-05 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r262661446
 
 

 ##
 File path: sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
 ##
 @@ -9,19 +9,19 @@ Selection:Best Time(ms)   
Avg Time(ms)   Stdev(m
 Top-level column 88114 
 16 11.4  87.5   1.0X
 Nested column   201223 
 27  5.0 200.5   0.4X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3
-Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Mac OS X 10.14.3
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
 Limiting: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column263315 
 36  3.8 263.2   1.0X
-Nested column  2111   2622 
613  0.52111.1   0.1X
+Top-level column230236 
  4  4.4 229.5   1.0X
+Nested column   356367 
  9  2.8 355.9   0.6X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3
-Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Mac OS X 10.14.3
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
 Repartitioning:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Top-level column222250 
 34  4.5 222.2   1.0X
-Nested column  2084   2339 
266  0.52084.2   0.1X
+Top-level column197201 
  6  5.1 196.6   1.0X
+Nested column   337345 
  7  3.0 336.6   0.6X
 
 Review comment:
   This is a result from the master branch a few minutes ago. I intentionally 
excluded the other irrelevant changes and kept the relevant ones like above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] Support nested-column pruning over limit/sample/repartition

2019-03-04 Thread GitBox
dongjoon-hyun commented on a change in pull request #23964: [SPARK-26975][SQL] 
Support nested-column pruning over limit/sample/repartition
URL: https://github.com/apache/spark/pull/23964#discussion_r262214266
 
 

 ##
 File path: sql/core/benchmarks/NestedSchemaPruningBenchmark-results.txt
 ##
 @@ -4,37 +4,37 @@ Nested Schema Pruning Benchmark
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Mac OS X 10.14.3
 Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
-Selection:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative
-
-Top-level column59 /   68 16.9 
 59.1   1.0X
-Nested column  180 /  186  5.6 
179.7   0.3X
+Selection:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Top-level column 62 71 
  9 16.2  61.6   1.0X
+Nested column   178185 
  6  5.6 178.0   0.3X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Mac OS X 10.14.3
 Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
-Limiting:Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative
-
-Top-level column   241 /  246  4.2 
240.9   1.0X
-Nested column 1828 / 1904  0.5
1827.5   0.1X
+Limiting: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Top-level column241244 
  3  4.2 240.8   1.0X
+Nested column   358371 
  8  2.8 358.2   0.7X
 
 Review comment:
   This and the next one is the scope of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org