[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19683


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159035379
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +228,45 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val N = 6
+
+val spark = sparkSession
+import org.apache.spark.sql.functions._
+import spark.implicits._
+
+val df = sparkSession.sparkContext.parallelize(Seq(("1",
+  Array.fill(N)({
+val i = math.random
+(i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
+  }.toDF("col", "arr")
+
+runBenchmark("generate big struct array", N) {
+  df.withColumn("arr_col", explode('arr)).select("col", 
"arr_col.*").count
+}
+
+  }
+
+/*
--- End diff --

This benchmark result should be moved up to be included in the ignored test 
block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159035248
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.benchmark
 
 import org.apache.spark.util.Benchmark
 
+
--- End diff --

nit: unnecessary blank line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159035178
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,13 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildIndex this paramter starts as Nil and gets filled 
by the Optimizer.
+ *  It's used as an optimization for omitting 
data generation that will
+ *  be discarded next by a projection.
+ *  A common use case is when we 
explode(array(..)) and are interested
+ *  only in the exploded data and not in the 
original array. before this
+ *  optimization the array got duplicated for 
each of its elements,
+ *  causing O(n^^2) memory consumption. (see 
[SPARK-21657])
--- End diff --

nit: seems an extra space since 2nd line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-28 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159033662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -85,11 +84,19 @@ case class GenerateExec(
 val numOutputRows = longMetric("numOutputRows")
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
-  val rows = if (join) {
+  val rows = if (requiredChildOutput.nonEmpty) {
+
+val pruneChildForResult: InternalRow => InternalRow =
+  if ((child.outputSet -- requiredChildOutput).isEmpty) {
--- End diff --

wouldn't it always return false? or should I use `child.output == 
AttributeSet(requiredChildOutput)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159033021
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -85,11 +84,19 @@ case class GenerateExec(
 val numOutputRows = longMetric("numOutputRows")
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
-  val rows = if (join) {
+  val rows = if (requiredChildOutput.nonEmpty) {
+
+val pruneChildForResult: InternalRow => InternalRow =
+  if ((child.outputSet -- requiredChildOutput).isEmpty) {
--- End diff --

just `child.output == requiredChildOutput`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159032990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -47,8 +47,13 @@ private[execution] sealed case class LazyIterator(func: 
() => TraversableOnce[In
  * terminate().
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param requiredChildOutput this paramter starts as Nil and gets filled 
by the Optimizer.
--- End diff --

we don't need to duplicate the comment here, just say `required attributes 
from child output`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159031802
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -57,20 +62,19 @@ private[execution] sealed case class LazyIterator(func: 
() => TraversableOnce[In
  */
 case class GenerateExec(
 generator: Generator,
-join: Boolean,
+unrequiredChildIndex: Seq[Int],
--- End diff --

The physical plan can just take `requiredChildOutput`, and in the planner 
we can just do
```
case g @ logical.Generate(...) => GenerateExec(..., g.requiredChildOutput)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r159031526
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,25 +73,32 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildIndex this paramter starts as Nil and gets filled 
by the Optimizer.
+ *  It's used as an optimization for omitting 
data generation that will
+ *  be discarded next by a projection.
+ *  A common use case is when we 
explode(array(..)) and are interested
+ *  only in the exploded data and not in the 
original array. before this
+ *  optimization the array got duplicated for 
each of its elements,
+ *  causing O(n^^2) memory consumption. (see 
[SPARK-21657])
  * @param outer when true, each input row will be output at least once, 
even if the output of the
  *  given `generator` is empty.
  * @param qualifier Qualifier for the attributes of generator(UDTF)
  * @param generatorOutput The output schema of the Generator.
  * @param child Children logical plan node
  */
 case class Generate(
-generator: Generator,
-join: Boolean,
-outer: Boolean,
-qualifier: Option[String],
-generatorOutput: Seq[Attribute],
-child: LogicalPlan)
+ generator: Generator,
+ unrequiredChildIndex: Seq[Int],
+ outer: Boolean,
+ qualifier: Option[String],
+ generatorOutput: Seq[Attribute],
+ child: LogicalPlan)
--- End diff --

wrong indentation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158906785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildOutput each output row is implicitly joined with 
the relevant part from the
--- End diff --

will try improve.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158906660
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---
@@ -359,12 +359,12 @@ package object dsl {
 
   def generate(
 generator: Generator,
-join: Boolean = false,
+unrequiredChildOutput: Seq[Attribute] = Nil,
--- End diff --

It looks like its a mistake.. I see it was only used in the test suites and 
they all overidden it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158902856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -85,11 +86,20 @@ case class GenerateExec(
 val numOutputRows = longMetric("numOutputRows")
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
-  val rows = if (join) {
+  val rows = if (requiredChildOutput.nonEmpty) {
+
+val pruneChildForResult: InternalRow => InternalRow =
+  if (unrequiredChildOutput.isEmpty) {
+identity
+  } else {
+UnsafeProjection.create(requiredChildOutput, child.output)
+  }
+
 val joinedRow = new JoinedRow
 iter.flatMap { row =>
+
   // we should always set the left (child output)
--- End diff --

`child output` -> `required child output`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158891945
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildOutput each output row is implicitly joined with 
the relevant part from the
--- End diff --

Ok. But the param doc seems can be improved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158892688
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---
@@ -359,12 +359,12 @@ package object dsl {
 
   def generate(
 generator: Generator,
-join: Boolean = false,
+unrequiredChildOutput: Seq[Attribute] = Nil,
--- End diff --

Previously `join` is false by default. This default `unrequiredChildOutput` 
value seems to contradict previous usage. Should we be consistent with before?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158891168
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val N = 6
+
+val spark = sparkSession
+import spark.implicits._
+import org.apache.spark.sql.functions._
+
+val df = sparkSession.sparkContext.parallelize(
--- End diff --

and put the result like other benchmarks in this file, e.g.
```
/*
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
 
 generate stack:  Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

 generate stack wholestage off   12953 / 13070  1.3 
772.1   1.0X
  generate stack wholestage on   836 /  847 
20.1  49.8  15.5X
   */
```
You can run this benchmark without your PR and put the result in PR comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158891075
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val N = 6
+
+val spark = sparkSession
+import spark.implicits._
+import org.apache.spark.sql.functions._
+
+val df = sparkSession.sparkContext.parallelize(
--- End diff --

please make it a real benchmark
```
val df = Seq("1", Array.fill(N)(i => (i.toString, (i + 1).toString, (i + 
2).toString, (i + 3).toString))).toDF("col", "arr")
runBenchmark("generate big struct array", N) {
  df.withColumn("arr_col", explode('arr)).select("col", "arr_col.*").count
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890765
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest {
 assertEqual(
   "select * from t lateral view explode(x) expl as x",
   table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq("x"))
+.generate(explode, alias = Some("expl"), outputNames = Seq("x"))
 .select(star()))
 
 // Multiple lateral views
+val exploded = table("t")
+  .generate(explode, alias = Some("expl"))
+
 assertEqual(
   """select *
 |from t
 |lateral view explode(x) expl
 |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin,
-  table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq.empty)
-.generate(jsonTuple, join = true, outer = true, Some("jtup"), 
Seq("q", "z"))
+  exploded
+.generate(jsonTuple, outer = true, alias = Some("jtup"), 
outputNames = Seq("q", "z"))
--- End diff --

and remove
```
val exploded = table("t")
  .generate(explode, alias = Some("expl"))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890738
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest {
 assertEqual(
   "select * from t lateral view explode(x) expl as x",
   table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq("x"))
+.generate(explode, alias = Some("expl"), outputNames = Seq("x"))
 .select(star()))
 
 // Multiple lateral views
+val exploded = table("t")
+  .generate(explode, alias = Some("expl"))
+
 assertEqual(
   """select *
 |from t
 |lateral view explode(x) expl
 |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin,
-  table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq.empty)
-.generate(jsonTuple, join = true, outer = true, Some("jtup"), 
Seq("q", "z"))
+  exploded
+.generate(jsonTuple, outer = true, alias = Some("jtup"), 
outputNames = Seq("q", "z"))
--- End diff --

nit:
```
table("t")
  .generate(explode, alias = Some("expl"))
  .generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = 
Seq("q", "z"))
```
to make it more similar to the previous code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890659
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil") {
+val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
 
-val query = input.generate(Explode('b), join = false).analyze
+val query =
+  input
+.generate(Explode('c), outputNames = "explode" :: Nil)
+.select('c, 'explode)
+.analyze
 
 val optimized = Optimize.execute(query)
 
-val correctAnswer = input.select('b).generate(Explode('b), join = 
false).analyze
+val correctAnswer =
+  input
+.select('c)
+.generate(Explode('c), unrequiredChildOutput = Nil,
--- End diff --

don't need `unrequiredChildOutput = Nil` as it's the default value


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890604
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,17 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// prune unrequired references
+case p @ Project(_, g: Generate) =>
+  if (p.references == g.outputSet) {
+p
+  } else {
+val requiredAttrs = p.references -- g.producedAttributes ++ 
g.generator.references
+val newChild = prunedChild(g.child, requiredAttrs)
+p.copy(child = g.copy(child = newChild,
+  unrequiredChildOutput = (g.generator.references 
-- p.references).toSeq))
--- End diff --

a small improvement to allow us to still hit `case p @ Project(_, child) if 
sameOutput(child.output, p.output) => child` later
```
case p @ Project(_, g: Generate) if p.references != g.outputSet =>
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890267
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,17 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// prune unrequired references
+case p @ Project(_, g: Generate) =>
+  if (p.references == g.outputSet) {
+p
+  } else {
+val requiredAttrs = p.references -- g.producedAttributes ++ 
g.generator.references
+val newChild = prunedChild(g.child, requiredAttrs)
+p.copy(child = g.copy(child = newChild,
+  unrequiredChildOutput = (g.generator.references 
-- p.references).toSeq))
--- End diff --

nit: code style
```
val unrequired = (g.generator.references -- p.references).toSeq
p.copy(child = g.copy(child = newChild, unrequiredChildOutput = unrequired)
```
avoid very long statement and create local variables to shorten it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890134
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1605,7 +1607,8 @@ class Analyzer(
 resolvedGenerator =
   Generate(
 generator,
-join = projectList.size > 1, // Only join if there are 
other expressions in SELECT.
+// unrequiredChildOutput=Nil if there are other 
expressions in SELECT.
+unrequiredChildOutput = if (projectList.size > 1) Nil else 
child.output,
--- End diff --

maybe just use `Nil` here and let the optimizer to do the work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890001
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// prune unrequired references
+case g : Generate
+  if (AttributeSet(g.unrequiredChildOutput) -- 
g.generator.references).nonEmpty =>
+g.copy(child = prunedChild(g.child,
+g.child.outputSet -- g.unrequiredChildOutput ++ 
g.generator.references),
+   unrequiredChildOutput = g.generator.references.toSeq)
+
+// Sync Generate's unrequiredChildOutput with the actual needed outputs
+case p @ Project(_, g: Generate) =>
--- End diff --

makes sense, we should move this rule below `case p @ Project(_, child) if 
sameOutput(child.output, p.output) => child`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158889881
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  */
 case class Generate(
 generator: Generator,
-join: Boolean,
+unrequiredChildOutput: Seq[Attribute],
--- End diff --

we can just add it in the constructor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158854831
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  */
 case class Generate(
 generator: Generator,
-join: Boolean,
+unrequiredChildOutput: Seq[Attribute],
--- End diff --

not sure where, just add it to the "override lazy val resolved: Boolean"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158854751
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// prune unrequired references
+case g : Generate
+  if (AttributeSet(g.unrequiredChildOutput) -- 
g.generator.references).nonEmpty =>
+g.copy(child = prunedChild(g.child,
+g.child.outputSet -- g.unrequiredChildOutput ++ 
g.generator.references),
+   unrequiredChildOutput = g.generator.references.toSeq)
+
+// Sync Generate's unrequiredChildOutput with the actual needed outputs
+case p @ Project(_, g: Generate) =>
--- End diff --

Yep, I agree.
although, I'm not sure why do we "swallow" the this case up to here and not 
letting it proceed to other rules like:
// Eliminate no-op Projects
case p @ Project(_, child) if sameOutput(child.output, p.output) => 
child

because in effect the Generate is now doing the job of the Project


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158845778
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest {
 assertEqual(
   "select * from t lateral view explode(x) expl as x",
   table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq("x"))
+.generate(explode, alias = Some("expl"), outputNames = Seq("x"))
 .select(star()))
 
 // Multiple lateral views
+val exploded = table("t")
+  .generate(explode, alias = Some("expl"))
+
 assertEqual(
   """select *
 |from t
 |lateral view explode(x) expl
 |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin,
-  table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq.empty)
-.generate(jsonTuple, join = true, outer = true, Some("jtup"), 
Seq("q", "z"))
+ exploded
+.generate(jsonTuple, outer = true, alias = Some("jtup"), 
outputNames = Seq("q", "z"))
--- End diff --

can we keep the previous code style and inline `exploded`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158845677
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil ") {
+val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
 
-val query = input.generate(Explode('b), join = false).analyze
+val query =
+  input
+.generate(Explode('c), outputNames = "explode" :: Nil)
+.select('a, 'explode)
+.analyze
 
 val optimized = Optimize.execute(query)
 
-val correctAnswer = input.select('b).generate(Explode('b), join = 
false).analyze
+val correctAnswer =
+  input
+.select('a, 'c)
+.generate(Explode('c), unrequiredChildOutput = 
input.select('c).analyze.references.toSeq,
+  outputNames = "explode" :: Nil)
+.select('a, 'explode)
+.analyze
 
 comparePlans(optimized, correctAnswer)
   }
 
-  test("Column pruning for Generate when Generate.join = true") {
-val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
+  test("Fill Generate.unrequiredChildOutput if possible") {
+val input = LocalRelation('b.array(StringType))
 
 val query =
   input
-.generate(Explode('c), join = true, outputNames = "explode" :: Nil)
-.select('a, 'explode)
+.generate(Explode('b), outputNames = "explode" :: Nil)
+.select(('explode + 1).as("result"))
 .analyze
 
 val optimized = Optimize.execute(query)
 
 val correctAnswer =
   input
-.select('a, 'c)
-.generate(Explode('c), join = true, outputNames = "explode" :: Nil)
-.select('a, 'explode)
+.generate(Explode('b), unrequiredChildOutput = 
input.references.toSeq,
+  outputNames = "explode" :: Nil)
+ .select(('explode + 1).as("result"))
 .analyze
 
 comparePlans(optimized, correctAnswer)
   }
 
-  test("Turn Generate.join to false if possible") {
-val input = LocalRelation('b.array(StringType))
+  test("Another fill Generate.unrequiredChildOutput if possible") {
+val input = LocalRelation('a.int, 'b.int, 'c1.string, 'c2.string)
 
 val query =
   input
-.generate(Explode('b), join = true, outputNames = "explode" :: Nil)
-.select(('explode + 1).as("result"))
+.generate(Explode(CreateArray(Seq('c1, 'c2))), outputNames = 
"explode" :: Nil)
+.select('a, 'c1, 'explode)
 .analyze
 
 val optimized = Optimize.execute(query)
 
 val correctAnswer =
   input
-.generate(Explode('b), join = false, outputNames = "explode" :: 
Nil)
-.select(('explode + 1).as("result"))
+.select('a, 'c1, 'c2)
+.generate(Explode(CreateArray(Seq('c1, 'c2))),
+  unrequiredChildOutput = input.select('c2).analyze.output,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158845618
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil ") {
+val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
 
-val query = input.generate(Explode('b), join = false).analyze
+val query =
+  input
+.generate(Explode('c), outputNames = "explode" :: Nil)
+.select('a, 'explode)
+.analyze
 
 val optimized = Optimize.execute(query)
 
-val correctAnswer = input.select('b).generate(Explode('b), join = 
false).analyze
+val correctAnswer =
+  input
+.select('a, 'c)
+.generate(Explode('c), unrequiredChildOutput = 
input.select('c).analyze.references.toSeq,
+  outputNames = "explode" :: Nil)
+.select('a, 'explode)
+.analyze
 
 comparePlans(optimized, correctAnswer)
   }
 
-  test("Column pruning for Generate when Generate.join = true") {
-val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
+  test("Fill Generate.unrequiredChildOutput if possible") {
+val input = LocalRelation('b.array(StringType))
 
 val query =
   input
-.generate(Explode('c), join = true, outputNames = "explode" :: Nil)
-.select('a, 'explode)
+.generate(Explode('b), outputNames = "explode" :: Nil)
+.select(('explode + 1).as("result"))
 .analyze
 
 val optimized = Optimize.execute(query)
 
 val correctAnswer =
   input
-.select('a, 'c)
-.generate(Explode('c), join = true, outputNames = "explode" :: Nil)
-.select('a, 'explode)
+.generate(Explode('b), unrequiredChildOutput = 
input.references.toSeq,
--- End diff --

input.output


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158845516
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil ") {
+val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
 
-val query = input.generate(Explode('b), join = false).analyze
+val query =
+  input
+.generate(Explode('c), outputNames = "explode" :: Nil)
+.select('a, 'explode)
--- End diff --

to match the test name, we should refer `c` here, and check if the 
optimizer can prune `a` and `b`, without changing `unrequiredChildOutput`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158845436
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// prune unrequired references
+case g : Generate
+  if (AttributeSet(g.unrequiredChildOutput) -- 
g.generator.references).nonEmpty =>
+g.copy(child = prunedChild(g.child,
+g.child.outputSet -- g.unrequiredChildOutput ++ 
g.generator.references),
+   unrequiredChildOutput = g.generator.references.toSeq)
+
+// Sync Generate's unrequiredChildOutput with the actual needed outputs
+case p @ Project(_, g: Generate) =>
--- End diff --

seems like we can merge this case with the above case, as column pruning 
only works if there is a `Project` above `Generate`
```
case p @ Project(_, g: Generate) =>
  if (p.preference == g.outputSet) {
p
  } else {
val requiredAttrs = p.reference -- g.producedAttributes ++ 
g.generator.reference
val newChild = pruneChild(g.child, requiredAttrs)
p.copy(child = g.copy(child = newChild, unrequiredChildOutput = 
g.generator.reference -- p.reference))
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158843095
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil ") {
+val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
 
-val query = input.generate(Explode('b), join = false).analyze
+val query =
+  input
+.generate(Explode('c), outputNames = "explode" :: Nil)
+.select('a, 'explode)
+.analyze
 
 val optimized = Optimize.execute(query)
 
-val correctAnswer = input.select('b).generate(Explode('b), join = 
false).analyze
+val correctAnswer =
+  input
+.select('a, 'c)
+.generate(Explode('c), unrequiredChildOutput = 
input.select('c).analyze.references.toSeq,
--- End diff --

just `unrequiredChildOutput = input.output(2)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158842896
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  */
 case class Generate(
 generator: Generator,
-join: Boolean,
+unrequiredChildOutput: Seq[Attribute],
--- End diff --

shall we add an assert to make sure `unrequiredChildOutput` are always 
resolved?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158842853
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  */
 case class Generate(
 generator: Generator,
-join: Boolean,
+unrequiredChildOutput: Seq[Attribute],
 outer: Boolean,
 qualifier: Option[String],
 generatorOutput: Seq[Attribute],
 child: LogicalPlan)
   extends UnaryNode {
 
-  /** The set of all attributes produced by this node. */
-  def generatedSet: AttributeSet = AttributeSet(generatorOutput)
+  def requiredChildOutput(): Seq[Attribute] = {
--- End diff --

use `lazy val` can save some re-computing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158842777
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// prune unrequired references
+case g : Generate
+  if (AttributeSet(g.unrequiredChildOutput) -- 
g.generator.references).nonEmpty =>
+g.copy(child = prunedChild(g.child,
+g.child.outputSet -- g.unrequiredChildOutput ++ 
g.generator.references),
+   unrequiredChildOutput = g.generator.references.toSeq)
--- End diff --

This looks pretty complicated, how about
```
case g: Generate if (g.child.outputSet -- g.references -- 
g.requiredChildOutput).nonEmpty =>
  g.copy(child = prunedChild(g.child, g.references))
```
The logical is pretty simple, if child output has something that is not 
referred by generator, or by parent, prune the child.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158814175
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildOutput each output row is implicitly joined with 
the relevant part from the
--- End diff --

Generally I agree, but in this case we will have to "fill" it in a few 
places when we initialise the object instead of starting it with Nil sequence. 
this way all the work is done entirely in the Optimizer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158781421
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildOutput each output row is implicitly joined with 
the relevant part from the
--- End diff --

Is it mandatory to use negative ones? `requiredChildOutput` seems more 
straightforward. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -57,19 +59,15 @@ private[execution] sealed case class LazyIterator(func: 
() => TraversableOnce[In
  */
 case class GenerateExec(
 generator: Generator,
-join: Boolean,
+unrequiredChildOutput: Seq[Attribute],
 outer: Boolean,
 generatorOutput: Seq[Attribute],
 child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
-  override def output: Seq[Attribute] = {
-if (join) {
-  child.output ++ generatorOutput
-} else {
-  generatorOutput
-}
-  }
+  private def requiredChildOutput() = 
child.output.filterNot(unrequiredChildOutput.contains)
--- End diff --

This is `O(n^2)`, maybe
```
private def requiredChildOutput() = {
  val unrequiredSet = AttributeSet(unrequiredChildOutput)
  child.output.filterNot(unrequiredSet)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751868
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -276,22 +276,28 @@ class PlanParserSuite extends AnalysisTest {
 assertEqual(
   "select * from t lateral view explode(x) expl as x",
   table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq("x"))
+.generate(explode, unrequiredChildOutput = Nil, outer = false,
+  Some("expl"), Seq("x"))
 .select(star()))
 
 // Multiple lateral views
+val exploded = table("t")
+  .generate(explode, unrequiredChildOutput = Nil, outer = false,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751828
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,69 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil") {
+val input = LocalRelation('a.int, 'b.int, 'c.array(StringType))
 
-val query = input.generate(Explode('b), join = false).analyze
+val query =
+  input
+.generate(Explode('c), unrequiredChildOutput = Nil, outputNames = 
"explode" :: Nil)
--- End diff --


https://github.com/apache/spark/pull/19683/files#diff-f475ef52a0c0bf5800a1b0cbc5420876R362
 `unrequiredChildOutput` is `Nil` by default, we don't need to set it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751840
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -628,14 +628,14 @@ class FilterPushdownSuite extends PlanTest {
   test("generate: predicate referenced no generated column") {
 val originalQuery = {
   testRelationWithArrayType
-.generate(Explode('c_arr), true, false, Some("arr"))
+.generate(Explode('c_arr), unrequiredChildOutput = Nil, false, 
Some("arr"))
 .where(('b >= 5) && ('a > 6))
 }
 val optimized = Optimize.execute(originalQuery.analyze)
 val correctAnswer = {
   testRelationWithArrayType
 .where(('b >= 5) && ('a > 6))
-.generate(Explode('c_arr), true, false, Some("arr")).analyze
+.generate(Explode('c_arr), unrequiredChildOutput = Nil, false, 
Some("arr")).analyze
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751759
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 ---
@@ -38,54 +38,69 @@ class ColumnPruningSuite extends PlanTest {
   CollapseProject) :: Nil
   }
 
-  test("Column pruning for Generate when Generate.join = false") {
-val input = LocalRelation('a.int, 'b.array(StringType))
+  test("Column pruning for Generate when Generate.unrequiredChildOutput = 
Nil") {
--- End diff --

`join=false` means `unrequiredChildOutput=child.output`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751546
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// Sync Generate's unrequiredChildOutput with the actual needed outputs
+case p @ Project(_, g: Generate) =>
+  val actualUnrequired = g.child.outputSet -- p.references
+  if (actualUnrequired == AttributeSet(g.unrequiredChildOutput)) {
+p
+  } else {
+p.copy(child = g.copy(unrequiredChildOutput = 
actualUnrequired.toSeq))
+  }
+
+// prune unrequired references
+case g : Generate
--- End diff --

or merge this with `case p @ Project(_, g: Generate)`
```
case p @ Project(_, g: Generate) =>
  val unrequiredByGenerator = g.child.outputSet -- g.references
  val unrequiredByProj = g.child.outputSet -- p.references
  if (unrequiredByGenerator.isEmpty && unrequiredByProj.isEmpty) {
p
  } else {
p.copy(child = g.copy(unrequiredChildOutput = unrequiredByProj.toSeq, 
child = prunedChild(g.child, g.references)))
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751339
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
-  g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// Sync Generate's unrequiredChildOutput with the actual needed outputs
+case p @ Project(_, g: Generate) =>
+  val actualUnrequired = g.child.outputSet -- p.references
+  if (actualUnrequired == AttributeSet(g.unrequiredChildOutput)) {
+p
+  } else {
+p.copy(child = g.copy(unrequiredChildOutput = 
actualUnrequired.toSeq))
+  }
+
+// prune unrequired references
+case g : Generate
--- End diff --

We may never reach here if there is a `Project(_, g: Generate)`.

We should keep the previous order, move `case g : Generate` before `case p 
@ Project(_, g: Generate)`, and do
```
case g: Generate if g.requiredChildOutput.isEmpty && (g.child.outputSet -- 
g.references).nonEmpty =>
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751132
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -608,8 +608,8 @@ trait CheckAnalysis extends PredicateHelper {
   // allows to have correlation under it
   // but must not host any outer references.
   // Note:
-  // Generator with join=false is treated as Category 4.
-  case g: Generate if g.join =>
+  // Generator with requiredChildOutput.isEmpty is treated as Category 
4.
--- End diff --

The comment is wrong


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158751010
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1605,7 +1607,8 @@ class Analyzer(
 resolvedGenerator =
   Generate(
 generator,
-join = projectList.size > 1, // Only join if there are 
other expressions in SELECT.
+// Only unrequiredChildOutput if there are other 
expressions in SELECT.
--- End diff --

`unrequiredChildOutput=Nil if there are other expressions in SELECT.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158750949
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -695,7 +695,7 @@ class Analyzer(
   (oldVersion, oldVersion.copy(aggregateExpressions = 
newAliases(aggregateExpressions)))
 
 case oldVersion: Generate
-if 
oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
+if 
oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
--- End diff --

ah i see


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158737578
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -846,12 +846,13 @@ class Analyzer(
   // ResolveReferences. Attributes in the output will be resolved by 
ResolveGenerate.
   case g @ Generate(generator, _, _, _, _, _) if generator.resolved => 
g
 
-  case g @ Generate(generator, join, outer, qualifier, output, child) 
=>
+  case g @ Generate(generator, requiredChildOutput, outer, qualifier, 
output, child) =>
 val newG = resolveExpression(generator, child, throws = true)
 if (newG.fastEquals(generator)) {
   g
 } else {
-  Generate(newG.asInstanceOf[Generator], join, outer, qualifier, 
output, child)
+  Generate(newG.asInstanceOf[Generator], child.output, outer,
--- End diff --

agree


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158737560
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -695,7 +695,7 @@ class Analyzer(
   (oldVersion, oldVersion.copy(aggregateExpressions = 
newAliases(aggregateExpressions)))
 
 case oldVersion: Generate
-if 
oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
+if 
oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
--- End diff --

well, the implementation was identical in class Generate:
  def generatedSet: AttributeSet = AttributeSet(generatorOutput)
  override def producedAttributes: AttributeSet = 
AttributeSet(generatorOutput)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158717309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -623,7 +623,7 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 val expressions = expressionList(ctx.expression)
 Generate(
   UnresolvedGenerator(visitFunctionName(ctx.qualifiedName), 
expressions),
-  join = true,
+  requiredChildOutput = query.references.toSeq,
--- End diff --

maybe `query.output`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158717147
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -444,12 +444,14 @@ object ColumnPruning extends Rule[LogicalPlan] {
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
-case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
+case g: Generate
+  if g.requiredChildOutput.isEmpty && (g.child.outputSet -- 
g.references).nonEmpty =>
   g.copy(child = prunedChild(g.child, g.references))
 
-// Turn off `join` for Generate if no column from it's child is used
-case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
-  p.copy(child = g.copy(join = false))
+// Sync Generate's requiredChildOutput with the actual needed outputs
+case p @ Project(_, g: Generate)
+  if (AttributeSet(g.requiredChildOutput) -- 
(p.references--g.producedAttributes)).nonEmpty =>
+  p.copy(child = g.copy(requiredChildOutput = 
(p.references--g.producedAttributes).toSeq))
--- End diff --

we can simplify this
```
case p @ Project(_, g: Generate) =>
  val actualRequired = p.references -- g.producedAttributes
  if (AttributeSet(g.requiredChildOutput) == actualRequired) {
p
  } else {
p.copy(child = g.copy(requiredChildOutput = actualRequired.toSeq))
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158716646
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1151,7 +1152,8 @@ class Analyzer(
   // If join is false, we will convert it to true for getting from 
the child the missing
   // attributes that its child might have or could have.
   val missing = missingAttrs -- g.child.outputSet
-  g.copy(join = true, child = addMissingAttr(g.child, missing))
+  g.copy(requiredChildOutput = g.child.outputSet.toSeq,
--- End diff --

nit `requiredChildOutput = g.child.output`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158716569
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -846,12 +846,13 @@ class Analyzer(
   // ResolveReferences. Attributes in the output will be resolved by 
ResolveGenerate.
   case g @ Generate(generator, _, _, _, _, _) if generator.resolved => 
g
 
-  case g @ Generate(generator, join, outer, qualifier, output, child) 
=>
+  case g @ Generate(generator, requiredChildOutput, outer, qualifier, 
output, child) =>
 val newG = resolveExpression(generator, child, throws = true)
 if (newG.fastEquals(generator)) {
   g
 } else {
-  Generate(newG.asInstanceOf[Generator], join, outer, qualifier, 
output, child)
+  Generate(newG.asInstanceOf[Generator], child.output, outer,
--- End diff --

we should reserve `requiredChildOutput` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158716496
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -695,7 +695,7 @@ class Analyzer(
   (oldVersion, oldVersion.copy(aggregateExpressions = 
newAliases(aggregateExpressions)))
 
 case oldVersion: Generate
-if 
oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
+if 
oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
--- End diff --

Is this correct? `AttributeSet.intersect` is special.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158716207
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -115,7 +113,8 @@ case class Generate(
   }
 
   def output: Seq[Attribute] = {
-if (join) child.output ++ qualifiedGeneratorOutput else 
qualifiedGeneratorOutput
+if (requiredChildOutput.nonEmpty) child.output ++ 
qualifiedGeneratorOutput
--- End diff --

This should be `child.output ++ requiredChildOutput`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-22 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158552882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorReferences` for Generate if its child column 
is not used
+case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, 
_, _, _))
+  if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty =>
--- End diff --

sounds reasonable. let me try that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158500070
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorReferences` for Generate if its child column 
is not used
+case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, 
_, _, _))
+  if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty =>
--- End diff --

Actually we can do better, the project may only need part of the generator 
references. How about we add a new parameter `requiredChildOutputs: 
Seq[Attribute]` to replace the `join` and `omitGeneratorReferences`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158499618
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorReferences` for Generate if its child column 
is not used
+case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, 
_, _, _))
+  if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty =>
--- End diff --

yea, this should be
```
case p @ Project(_, g @ Generate(generator, join @ true, _, 
omitGeneratorRefs @ false, _, _, _))
if p.references.intersect(generator.references).isEmpty =>
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-22 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158488942
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorReferences` for Generate if its child column 
is not used
+case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, 
_, _, _))
+  if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty =>
--- End diff --

@cloud-fan should I also change here from gu.child to gu.references?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436989
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -474,10 +474,10 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 execution.GlobalLimitExec(limit, planLater(child)) :: Nil
   case logical.Union(unionChildren) =>
 execution.UnionExec(unionChildren.map(planLater)) :: Nil
-  case g @ logical.Generate(generator, join, outer, _, _, child) =>
+  case g @ logical.Generate(generator, join, outer, 
omitGeneratorChild, _, _, child) =>
 execution.GenerateExec(
-  generator, join = join, outer = outer, 
g.qualifiedGeneratorOutput,
-  planLater(child)) :: Nil
+  generator, join = join, outer = outer, omitGeneratorChild = 
omitGeneratorChild,
--- End diff --

`omitGeneratorChild = omitGeneratorChild` looks redundant, we can just pass 
`omitGeneratorChild`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436905
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +227,31 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val BASE = 1234567890
+val N = 4
+
+val spark = sparkSession
+import spark.implicits._
+import org.apache.spark.sql.functions._
+
+val df = sparkSession.sparkContext.parallelize(
+  List(("1234567890", (BASE to (BASE + N)).map(
+x => (x.toString, (x + 1).toString, (x + 2).toString, (x + 
3).toString)).toArray)))
+  .toDF("c1", "c_arr")
+  .select(col("c1"), explode(col("c_arr")))
+  .selectExpr("c1", "col.*")
+
+df.cache.count
--- End diff --

can you follow the existing benchmark style in this file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436824
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +227,31 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val BASE = 1234567890
--- End diff --

why need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -86,10 +96,24 @@ case class GenerateExec(
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
   val rows = if (join) {
+
+lazy val project = UnsafeProjection.create(
+  projectedChildOutput,
+  child.output,
+  subexpressionEliminationEnabled)
+
 val joinedRow = new JoinedRow
 iter.flatMap { row =>
+
+  val projectedRow = if (omitGeneratorChild) {
+project.initialize(index)
+project(row)
+  } else {
+row
+  }
+
   // we should always set the left (child output)
-  joinedRow.withLeft(row)
+  joinedRow.withLeft(projectedRow)
--- End diff --

here just `jonedRow.withLeft(pruneChildForResult(row))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436751
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -86,10 +96,24 @@ case class GenerateExec(
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
   val rows = if (join) {
+
+lazy val project = UnsafeProjection.create(
+  projectedChildOutput,
+  child.output,
+  subexpressionEliminationEnabled)
--- End diff --

we can simplify the code and centralize all the logic here
```
val pruneChildForResult: InternalRow => InternalRow = if 
(omitGeneratorReferences) {
  val generatorReferences = generator.references
  val requiredChildOutput = 
child.output.filterNot(generatorReferences.contains)
  UnsafeProjection.create(requiredChildOutput, child.output)
} else {
  identity
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436519
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -86,10 +96,24 @@ case class GenerateExec(
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
   val rows = if (join) {
+
+lazy val project = UnsafeProjection.create(
+  projectedChildOutput,
+  child.output,
+  subexpressionEliminationEnabled)
--- End diff --

no need to set this. It's a very simple projection that only deal with 
attributes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158436309
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -59,13 +61,21 @@ case class GenerateExec(
 generator: Generator,
 join: Boolean,
 outer: Boolean,
+omitGeneratorChild: Boolean,
 generatorOutput: Seq[Attribute],
 child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
+  private def projectedChildOutput = generator match {
+case g: UnaryExpression if omitGeneratorChild =>
--- End diff --

why limit to `UnaryExpression`? Think about if we have an array concat 
function in the future, and when we do `explode(array_concat(col1, col2))`, we 
should be able to omit both `col1` and `col2`.

I'd like to add a `omitGeneratorReferences` parameter, and here can be 
simplified to
```
private def requiredChildOutput = if (omitGeneratorReferences) {
  val generatorReferences = generator.references
  child.output.filterNot(generatorReferences.contains)
} else {
  child.output
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-29 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r153998263
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorChild` for Generate if it's child column is 
not used
+case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, 
_, _, _))
+  if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty =>
--- End diff --

doesn't compile:
Type mismatch, expected: NamedExpression, actual: Expression


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r153944758
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorChild` for Generate if it's child column is 
not used
+case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, 
_, _, _))
+  if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty =>
--- End diff --

`p.references.contains(gu.child)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r153944787
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
   p.copy(child = g.copy(join = false))
 
+// Turn on `omitGeneratorChild` for Generate if it's child column is 
not used
--- End diff --

nit: its


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r153945541
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -59,13 +61,21 @@ case class GenerateExec(
 generator: Generator,
 join: Boolean,
 outer: Boolean,
+omitGeneratorChild: Boolean,
 generatorOutput: Seq[Attribute],
 child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
+  private def projectedChildOutput = generator match {
+case g: UnaryExpression if omitGeneratorChild =>
+  child.output diff Seq(g.child)
--- End diff --

nit: `.diff(...)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-19 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r151868042
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -59,15 +61,23 @@ case class GenerateExec(
 generator: Generator,
 join: Boolean,
 outer: Boolean,
+omitGeneratorChild: Boolean,
 generatorOutput: Seq[Attribute],
 child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
+  private def projectedChildOutput = generator match {
+case g: UnaryExpression if omitGeneratorChild =>
+  (child.output diff Seq(g.child))
+case _ =>
+  child.output
+  }
+
   override def output: Seq[Attribute] = {
 if (join) {
-  child.output ++ generatorOutput
-} else {
-  generatorOutput
+  projectedChildOutput ++ generatorOutput
+  } else {
--- End diff --

fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-18 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r151855849
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -59,15 +61,23 @@ case class GenerateExec(
 generator: Generator,
 join: Boolean,
 outer: Boolean,
+omitGeneratorChild: Boolean,
 generatorOutput: Seq[Attribute],
 child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
+  private def projectedChildOutput = generator match {
+case g: UnaryExpression if omitGeneratorChild =>
+  (child.output diff Seq(g.child))
+case _ =>
+  child.output
+  }
+
   override def output: Seq[Attribute] = {
 if (join) {
-  child.output ++ generatorOutput
-} else {
-  generatorOutput
+  projectedChildOutput ++ generatorOutput
+  } else {
--- End diff --

nit: do we need update indentation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-11-07 Thread uzadude
GitHub user uzadude opened a pull request:

https://github.com/apache/spark/pull/19683

[SPARK-21657][SQL] optimize explode quadratic memory consumpation

## What changes were proposed in this pull request?

The issue has been raised in two Jira tickets: 
[SPARK-21657](https://issues.apache.org/jira/browse/SPARK-21657), 
[SPARK-16998](https://issues.apache.org/jira/browse/SPARK-16998). Basically, 
what happens is that in collection generators like explode/inline we create 
many rows from each row. Currently each exploded row contains also the column 
on which it was created. This causes, for example, if we have a 10k array in 
one row that this array will get copy 10k times - to each of the row. this 
results a qudratic memory consumption. However, it is a common case that the 
original column gets projected out after the explode, so we can avoid 
duplicating it.
In this solution we propose to identify this situation in the optimizer and 
turn on a flag for omitting the original column in the generation process.

## How was this patch tested?

1. We added a benchmark test to MiscBenchmark that shows x16 improvement in 
runtimes.
2. We ran some of the other tests in MiscBenchmark and they show 15% 
improvements.
3. We ran this code on a specific case from our production data with rows 
containing arrays of size ~200k and it reduced the runtime from 6 hours to 3 
mins.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uzadude/spark optimize_explode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19683.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19683


commit ce7c3694a99584348957dc756234bb667466be4e
Author: oraviv 
Date:   2017-11-07T11:34:21Z

[SPARK-21657][SQL] optimize explode quadratic memory consumpation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org