[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19683 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159035379 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +228,45 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val N = 6 + +val spark = sparkSession +import org.apache.spark.sql.functions._ +import spark.implicits._ + +val df = sparkSession.sparkContext.parallelize(Seq(("1", + Array.fill(N)({ +val i = math.random +(i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) + }.toDF("col", "arr") + +runBenchmark("generate big struct array", N) { + df.withColumn("arr_col", explode('arr)).select("col", "arr_col.*").count +} + + } + +/* --- End diff -- This benchmark result should be moved up to be included in the ignored test block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159035248 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.util.Benchmark + --- End diff -- nit: unnecessary blank line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159035178 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,13 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildIndex this paramter starts as Nil and gets filled by the Optimizer. + * It's used as an optimization for omitting data generation that will + * be discarded next by a projection. + * A common use case is when we explode(array(..)) and are interested + * only in the exploded data and not in the original array. before this + * optimization the array got duplicated for each of its elements, + * causing O(n^^2) memory consumption. (see [SPARK-21657]) --- End diff -- nit: seems an extra space since 2nd line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159033662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -85,11 +84,19 @@ case class GenerateExec( val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) - val rows = if (join) { + val rows = if (requiredChildOutput.nonEmpty) { + +val pruneChildForResult: InternalRow => InternalRow = + if ((child.outputSet -- requiredChildOutput).isEmpty) { --- End diff -- wouldn't it always return false? or should I use `child.output == AttributeSet(requiredChildOutput)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159033021 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -85,11 +84,19 @@ case class GenerateExec( val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) - val rows = if (join) { + val rows = if (requiredChildOutput.nonEmpty) { + +val pruneChildForResult: InternalRow => InternalRow = + if ((child.outputSet -- requiredChildOutput).isEmpty) { --- End diff -- just `child.output == requiredChildOutput`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159032990 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -47,8 +47,13 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * terminate(). * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param requiredChildOutput this paramter starts as Nil and gets filled by the Optimizer. --- End diff -- we don't need to duplicate the comment here, just say `required attributes from child output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159031802 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -57,20 +62,19 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In */ case class GenerateExec( generator: Generator, -join: Boolean, +unrequiredChildIndex: Seq[Int], --- End diff -- The physical plan can just take `requiredChildOutput`, and in the planner we can just do ``` case g @ logical.Generate(...) => GenerateExec(..., g.requiredChildOutput) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r159031526 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,25 +73,32 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildIndex this paramter starts as Nil and gets filled by the Optimizer. + * It's used as an optimization for omitting data generation that will + * be discarded next by a projection. + * A common use case is when we explode(array(..)) and are interested + * only in the exploded data and not in the original array. before this + * optimization the array got duplicated for each of its elements, + * causing O(n^^2) memory consumption. (see [SPARK-21657]) * @param outer when true, each input row will be output at least once, even if the output of the * given `generator` is empty. * @param qualifier Qualifier for the attributes of generator(UDTF) * @param generatorOutput The output schema of the Generator. * @param child Children logical plan node */ case class Generate( -generator: Generator, -join: Boolean, -outer: Boolean, -qualifier: Option[String], -generatorOutput: Seq[Attribute], -child: LogicalPlan) + generator: Generator, + unrequiredChildIndex: Seq[Int], + outer: Boolean, + qualifier: Option[String], + generatorOutput: Seq[Attribute], + child: LogicalPlan) --- End diff -- wrong indentation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158906785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildOutput each output row is implicitly joined with the relevant part from the --- End diff -- will try improve. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158906660 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala --- @@ -359,12 +359,12 @@ package object dsl { def generate( generator: Generator, -join: Boolean = false, +unrequiredChildOutput: Seq[Attribute] = Nil, --- End diff -- It looks like its a mistake.. I see it was only used in the test suites and they all overidden it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158902856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -85,11 +86,20 @@ case class GenerateExec( val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) - val rows = if (join) { + val rows = if (requiredChildOutput.nonEmpty) { + +val pruneChildForResult: InternalRow => InternalRow = + if (unrequiredChildOutput.isEmpty) { +identity + } else { +UnsafeProjection.create(requiredChildOutput, child.output) + } + val joinedRow = new JoinedRow iter.flatMap { row => + // we should always set the left (child output) --- End diff -- `child output` -> `required child output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158891945 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildOutput each output row is implicitly joined with the relevant part from the --- End diff -- Ok. But the param doc seems can be improved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158892688 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala --- @@ -359,12 +359,12 @@ package object dsl { def generate( generator: Generator, -join: Boolean = false, +unrequiredChildOutput: Seq[Attribute] = Nil, --- End diff -- Previously `join` is false by default. This default `unrequiredChildOutput` value seems to contradict previous usage. Should we be consistent with before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158891168 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val N = 6 + +val spark = sparkSession +import spark.implicits._ +import org.apache.spark.sql.functions._ + +val df = sparkSession.sparkContext.parallelize( --- End diff -- and put the result like other benchmarks in this file, e.g. ``` /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative generate stack wholestage off 12953 / 13070 1.3 772.1 1.0X generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ ``` You can run this benchmark without your PR and put the result in PR comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158891075 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val N = 6 + +val spark = sparkSession +import spark.implicits._ +import org.apache.spark.sql.functions._ + +val df = sparkSession.sparkContext.parallelize( --- End diff -- please make it a real benchmark ``` val df = Seq("1", Array.fill(N)(i => (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString))).toDF("col", "arr") runBenchmark("generate big struct array", N) { df.withColumn("arr_col", explode('arr)).select("col", "arr_col.*").count } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890765 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest { assertEqual( "select * from t lateral view explode(x) expl as x", table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq("x")) +.generate(explode, alias = Some("expl"), outputNames = Seq("x")) .select(star())) // Multiple lateral views +val exploded = table("t") + .generate(explode, alias = Some("expl")) + assertEqual( """select * |from t |lateral view explode(x) expl |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin, - table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq.empty) -.generate(jsonTuple, join = true, outer = true, Some("jtup"), Seq("q", "z")) + exploded +.generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = Seq("q", "z")) --- End diff -- and remove ``` val exploded = table("t") .generate(explode, alias = Some("expl")) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890738 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest { assertEqual( "select * from t lateral view explode(x) expl as x", table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq("x")) +.generate(explode, alias = Some("expl"), outputNames = Seq("x")) .select(star())) // Multiple lateral views +val exploded = table("t") + .generate(explode, alias = Some("expl")) + assertEqual( """select * |from t |lateral view explode(x) expl |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin, - table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq.empty) -.generate(jsonTuple, join = true, outer = true, Some("jtup"), Seq("q", "z")) + exploded +.generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = Seq("q", "z")) --- End diff -- nit: ``` table("t") .generate(explode, alias = Some("expl")) .generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = Seq("q", "z")) ``` to make it more similar to the previous code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890659 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil") { +val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) -val query = input.generate(Explode('b), join = false).analyze +val query = + input +.generate(Explode('c), outputNames = "explode" :: Nil) +.select('c, 'explode) +.analyze val optimized = Optimize.execute(query) -val correctAnswer = input.select('b).generate(Explode('b), join = false).analyze +val correctAnswer = + input +.select('c) +.generate(Explode('c), unrequiredChildOutput = Nil, --- End diff -- don't need `unrequiredChildOutput = Nil` as it's the default value --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890604 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,17 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// prune unrequired references +case p @ Project(_, g: Generate) => + if (p.references == g.outputSet) { +p + } else { +val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references +val newChild = prunedChild(g.child, requiredAttrs) +p.copy(child = g.copy(child = newChild, + unrequiredChildOutput = (g.generator.references -- p.references).toSeq)) --- End diff -- a small improvement to allow us to still hit `case p @ Project(_, child) if sameOutput(child.output, p.output) => child` later ``` case p @ Project(_, g: Generate) if p.references != g.outputSet => ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890267 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,17 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// prune unrequired references +case p @ Project(_, g: Generate) => + if (p.references == g.outputSet) { +p + } else { +val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references +val newChild = prunedChild(g.child, requiredAttrs) +p.copy(child = g.copy(child = newChild, + unrequiredChildOutput = (g.generator.references -- p.references).toSeq)) --- End diff -- nit: code style ``` val unrequired = (g.generator.references -- p.references).toSeq p.copy(child = g.copy(child = newChild, unrequiredChildOutput = unrequired) ``` avoid very long statement and create local variables to shorten it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890134 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1605,7 +1607,8 @@ class Analyzer( resolvedGenerator = Generate( generator, -join = projectList.size > 1, // Only join if there are other expressions in SELECT. +// unrequiredChildOutput=Nil if there are other expressions in SELECT. +unrequiredChildOutput = if (projectList.size > 1) Nil else child.output, --- End diff -- maybe just use `Nil` here and let the optimizer to do the work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890001 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// prune unrequired references +case g : Generate + if (AttributeSet(g.unrequiredChildOutput) -- g.generator.references).nonEmpty => +g.copy(child = prunedChild(g.child, +g.child.outputSet -- g.unrequiredChildOutput ++ g.generator.references), + unrequiredChildOutput = g.generator.references.toSeq) + +// Sync Generate's unrequiredChildOutput with the actual needed outputs +case p @ Project(_, g: Generate) => --- End diff -- makes sense, we should move this rule below `case p @ Project(_, child) if sameOutput(child.output, p.output) => child` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158889881 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend */ case class Generate( generator: Generator, -join: Boolean, +unrequiredChildOutput: Seq[Attribute], --- End diff -- we can just add it in the constructor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158854831 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend */ case class Generate( generator: Generator, -join: Boolean, +unrequiredChildOutput: Seq[Attribute], --- End diff -- not sure where, just add it to the "override lazy val resolved: Boolean"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158854751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// prune unrequired references +case g : Generate + if (AttributeSet(g.unrequiredChildOutput) -- g.generator.references).nonEmpty => +g.copy(child = prunedChild(g.child, +g.child.outputSet -- g.unrequiredChildOutput ++ g.generator.references), + unrequiredChildOutput = g.generator.references.toSeq) + +// Sync Generate's unrequiredChildOutput with the actual needed outputs +case p @ Project(_, g: Generate) => --- End diff -- Yep, I agree. although, I'm not sure why do we "swallow" the this case up to here and not letting it proceed to other rules like: // Eliminate no-op Projects case p @ Project(_, child) if sameOutput(child.output, p.output) => child because in effect the Generate is now doing the job of the Project --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158845778 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest { assertEqual( "select * from t lateral view explode(x) expl as x", table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq("x")) +.generate(explode, alias = Some("expl"), outputNames = Seq("x")) .select(star())) // Multiple lateral views +val exploded = table("t") + .generate(explode, alias = Some("expl")) + assertEqual( """select * |from t |lateral view explode(x) expl |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin, - table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq.empty) -.generate(jsonTuple, join = true, outer = true, Some("jtup"), Seq("q", "z")) + exploded +.generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = Seq("q", "z")) --- End diff -- can we keep the previous code style and inline `exploded`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158845677 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil ") { +val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) -val query = input.generate(Explode('b), join = false).analyze +val query = + input +.generate(Explode('c), outputNames = "explode" :: Nil) +.select('a, 'explode) +.analyze val optimized = Optimize.execute(query) -val correctAnswer = input.select('b).generate(Explode('b), join = false).analyze +val correctAnswer = + input +.select('a, 'c) +.generate(Explode('c), unrequiredChildOutput = input.select('c).analyze.references.toSeq, + outputNames = "explode" :: Nil) +.select('a, 'explode) +.analyze comparePlans(optimized, correctAnswer) } - test("Column pruning for Generate when Generate.join = true") { -val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) + test("Fill Generate.unrequiredChildOutput if possible") { +val input = LocalRelation('b.array(StringType)) val query = input -.generate(Explode('c), join = true, outputNames = "explode" :: Nil) -.select('a, 'explode) +.generate(Explode('b), outputNames = "explode" :: Nil) +.select(('explode + 1).as("result")) .analyze val optimized = Optimize.execute(query) val correctAnswer = input -.select('a, 'c) -.generate(Explode('c), join = true, outputNames = "explode" :: Nil) -.select('a, 'explode) +.generate(Explode('b), unrequiredChildOutput = input.references.toSeq, + outputNames = "explode" :: Nil) + .select(('explode + 1).as("result")) .analyze comparePlans(optimized, correctAnswer) } - test("Turn Generate.join to false if possible") { -val input = LocalRelation('b.array(StringType)) + test("Another fill Generate.unrequiredChildOutput if possible") { +val input = LocalRelation('a.int, 'b.int, 'c1.string, 'c2.string) val query = input -.generate(Explode('b), join = true, outputNames = "explode" :: Nil) -.select(('explode + 1).as("result")) +.generate(Explode(CreateArray(Seq('c1, 'c2))), outputNames = "explode" :: Nil) +.select('a, 'c1, 'explode) .analyze val optimized = Optimize.execute(query) val correctAnswer = input -.generate(Explode('b), join = false, outputNames = "explode" :: Nil) -.select(('explode + 1).as("result")) +.select('a, 'c1, 'c2) +.generate(Explode(CreateArray(Seq('c1, 'c2))), + unrequiredChildOutput = input.select('c2).analyze.output, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158845618 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil ") { +val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) -val query = input.generate(Explode('b), join = false).analyze +val query = + input +.generate(Explode('c), outputNames = "explode" :: Nil) +.select('a, 'explode) +.analyze val optimized = Optimize.execute(query) -val correctAnswer = input.select('b).generate(Explode('b), join = false).analyze +val correctAnswer = + input +.select('a, 'c) +.generate(Explode('c), unrequiredChildOutput = input.select('c).analyze.references.toSeq, + outputNames = "explode" :: Nil) +.select('a, 'explode) +.analyze comparePlans(optimized, correctAnswer) } - test("Column pruning for Generate when Generate.join = true") { -val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) + test("Fill Generate.unrequiredChildOutput if possible") { +val input = LocalRelation('b.array(StringType)) val query = input -.generate(Explode('c), join = true, outputNames = "explode" :: Nil) -.select('a, 'explode) +.generate(Explode('b), outputNames = "explode" :: Nil) +.select(('explode + 1).as("result")) .analyze val optimized = Optimize.execute(query) val correctAnswer = input -.select('a, 'c) -.generate(Explode('c), join = true, outputNames = "explode" :: Nil) -.select('a, 'explode) +.generate(Explode('b), unrequiredChildOutput = input.references.toSeq, --- End diff -- input.output --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158845516 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil ") { +val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) -val query = input.generate(Explode('b), join = false).analyze +val query = + input +.generate(Explode('c), outputNames = "explode" :: Nil) +.select('a, 'explode) --- End diff -- to match the test name, we should refer `c` here, and check if the optimizer can prune `a` and `b`, without changing `unrequiredChildOutput` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158845436 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// prune unrequired references +case g : Generate + if (AttributeSet(g.unrequiredChildOutput) -- g.generator.references).nonEmpty => +g.copy(child = prunedChild(g.child, +g.child.outputSet -- g.unrequiredChildOutput ++ g.generator.references), + unrequiredChildOutput = g.generator.references.toSeq) + +// Sync Generate's unrequiredChildOutput with the actual needed outputs +case p @ Project(_, g: Generate) => --- End diff -- seems like we can merge this case with the above case, as column pruning only works if there is a `Project` above `Generate` ``` case p @ Project(_, g: Generate) => if (p.preference == g.outputSet) { p } else { val requiredAttrs = p.reference -- g.producedAttributes ++ g.generator.reference val newChild = pruneChild(g.child, requiredAttrs) p.copy(child = g.copy(child = newChild, unrequiredChildOutput = g.generator.reference -- p.reference)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158843095 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,67 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil ") { +val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) -val query = input.generate(Explode('b), join = false).analyze +val query = + input +.generate(Explode('c), outputNames = "explode" :: Nil) +.select('a, 'explode) +.analyze val optimized = Optimize.execute(query) -val correctAnswer = input.select('b).generate(Explode('b), join = false).analyze +val correctAnswer = + input +.select('a, 'c) +.generate(Explode('c), unrequiredChildOutput = input.select('c).analyze.references.toSeq, --- End diff -- just `unrequiredChildOutput = input.output(2)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158842896 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend */ case class Generate( generator: Generator, -join: Boolean, +unrequiredChildOutput: Seq[Attribute], --- End diff -- shall we add an assert to make sure `unrequiredChildOutput` are always resolved? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158842853 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -83,15 +85,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend */ case class Generate( generator: Generator, -join: Boolean, +unrequiredChildOutput: Seq[Attribute], outer: Boolean, qualifier: Option[String], generatorOutput: Seq[Attribute], child: LogicalPlan) extends UnaryNode { - /** The set of all attributes produced by this node. */ - def generatedSet: AttributeSet = AttributeSet(generatorOutput) + def requiredChildOutput(): Seq[Attribute] = { --- End diff -- use `lazy val` can save some re-computing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158842777 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// prune unrequired references +case g : Generate + if (AttributeSet(g.unrequiredChildOutput) -- g.generator.references).nonEmpty => +g.copy(child = prunedChild(g.child, +g.child.outputSet -- g.unrequiredChildOutput ++ g.generator.references), + unrequiredChildOutput = g.generator.references.toSeq) --- End diff -- This looks pretty complicated, how about ``` case g: Generate if (g.child.outputSet -- g.references -- g.requiredChildOutput).nonEmpty => g.copy(child = prunedChild(g.child, g.references)) ``` The logical is pretty simple, if child output has something that is not referred by generator, or by parent, prune the child. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158814175 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildOutput each output row is implicitly joined with the relevant part from the --- End diff -- Generally I agree, but in this case we will have to "fill" it in a few places when we initialise the object instead of starting it with Nil sequence. this way all the work is done entirely in the Optimizer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158781421 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildOutput each output row is implicitly joined with the relevant part from the --- End diff -- Is it mandatory to use negative ones? `requiredChildOutput` seems more straightforward. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751974 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -57,19 +59,15 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In */ case class GenerateExec( generator: Generator, -join: Boolean, +unrequiredChildOutput: Seq[Attribute], outer: Boolean, generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryExecNode with CodegenSupport { - override def output: Seq[Attribute] = { -if (join) { - child.output ++ generatorOutput -} else { - generatorOutput -} - } + private def requiredChildOutput() = child.output.filterNot(unrequiredChildOutput.contains) --- End diff -- This is `O(n^2)`, maybe ``` private def requiredChildOutput() = { val unrequiredSet = AttributeSet(unrequiredChildOutput) child.output.filterNot(unrequiredSet) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751868 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -276,22 +276,28 @@ class PlanParserSuite extends AnalysisTest { assertEqual( "select * from t lateral view explode(x) expl as x", table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq("x")) +.generate(explode, unrequiredChildOutput = Nil, outer = false, + Some("expl"), Seq("x")) .select(star())) // Multiple lateral views +val exploded = table("t") + .generate(explode, unrequiredChildOutput = Nil, outer = false, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751828 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,69 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil") { +val input = LocalRelation('a.int, 'b.int, 'c.array(StringType)) -val query = input.generate(Explode('b), join = false).analyze +val query = + input +.generate(Explode('c), unrequiredChildOutput = Nil, outputNames = "explode" :: Nil) --- End diff -- https://github.com/apache/spark/pull/19683/files#diff-f475ef52a0c0bf5800a1b0cbc5420876R362 `unrequiredChildOutput` is `Nil` by default, we don't need to set it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751840 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -628,14 +628,14 @@ class FilterPushdownSuite extends PlanTest { test("generate: predicate referenced no generated column") { val originalQuery = { testRelationWithArrayType -.generate(Explode('c_arr), true, false, Some("arr")) +.generate(Explode('c_arr), unrequiredChildOutput = Nil, false, Some("arr")) .where(('b >= 5) && ('a > 6)) } val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = { testRelationWithArrayType .where(('b >= 5) && ('a > 6)) -.generate(Explode('c_arr), true, false, Some("arr")).analyze +.generate(Explode('c_arr), unrequiredChildOutput = Nil, false, Some("arr")).analyze --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751759 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala --- @@ -38,54 +38,69 @@ class ColumnPruningSuite extends PlanTest { CollapseProject) :: Nil } - test("Column pruning for Generate when Generate.join = false") { -val input = LocalRelation('a.int, 'b.array(StringType)) + test("Column pruning for Generate when Generate.unrequiredChildOutput = Nil") { --- End diff -- `join=false` means `unrequiredChildOutput=child.output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751546 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// Sync Generate's unrequiredChildOutput with the actual needed outputs +case p @ Project(_, g: Generate) => + val actualUnrequired = g.child.outputSet -- p.references + if (actualUnrequired == AttributeSet(g.unrequiredChildOutput)) { +p + } else { +p.copy(child = g.copy(unrequiredChildOutput = actualUnrequired.toSeq)) + } + +// prune unrequired references +case g : Generate --- End diff -- or merge this with `case p @ Project(_, g: Generate)` ``` case p @ Project(_, g: Generate) => val unrequiredByGenerator = g.child.outputSet -- g.references val unrequiredByProj = g.child.outputSet -- p.references if (unrequiredByGenerator.isEmpty && unrequiredByProj.isEmpty) { p } else { p.copy(child = g.copy(unrequiredChildOutput = unrequiredByProj.toSeq, child = prunedChild(g.child, g.references))) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751339 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,22 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => - g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// Sync Generate's unrequiredChildOutput with the actual needed outputs +case p @ Project(_, g: Generate) => + val actualUnrequired = g.child.outputSet -- p.references + if (actualUnrequired == AttributeSet(g.unrequiredChildOutput)) { +p + } else { +p.copy(child = g.copy(unrequiredChildOutput = actualUnrequired.toSeq)) + } + +// prune unrequired references +case g : Generate --- End diff -- We may never reach here if there is a `Project(_, g: Generate)`. We should keep the previous order, move `case g : Generate` before `case p @ Project(_, g: Generate)`, and do ``` case g: Generate if g.requiredChildOutput.isEmpty && (g.child.outputSet -- g.references).nonEmpty => ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751132 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -608,8 +608,8 @@ trait CheckAnalysis extends PredicateHelper { // allows to have correlation under it // but must not host any outer references. // Note: - // Generator with join=false is treated as Category 4. - case g: Generate if g.join => + // Generator with requiredChildOutput.isEmpty is treated as Category 4. --- End diff -- The comment is wrong --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158751010 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1605,7 +1607,8 @@ class Analyzer( resolvedGenerator = Generate( generator, -join = projectList.size > 1, // Only join if there are other expressions in SELECT. +// Only unrequiredChildOutput if there are other expressions in SELECT. --- End diff -- `unrequiredChildOutput=Nil if there are other expressions in SELECT.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158750949 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -695,7 +695,7 @@ class Analyzer( (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) case oldVersion: Generate -if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty => +if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => --- End diff -- ah i see --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158737578 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -846,12 +846,13 @@ class Analyzer( // ResolveReferences. Attributes in the output will be resolved by ResolveGenerate. case g @ Generate(generator, _, _, _, _, _) if generator.resolved => g - case g @ Generate(generator, join, outer, qualifier, output, child) => + case g @ Generate(generator, requiredChildOutput, outer, qualifier, output, child) => val newG = resolveExpression(generator, child, throws = true) if (newG.fastEquals(generator)) { g } else { - Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child) + Generate(newG.asInstanceOf[Generator], child.output, outer, --- End diff -- agree --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158737560 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -695,7 +695,7 @@ class Analyzer( (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) case oldVersion: Generate -if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty => +if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => --- End diff -- well, the implementation was identical in class Generate: def generatedSet: AttributeSet = AttributeSet(generatorOutput) override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158717309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -623,7 +623,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val expressions = expressionList(ctx.expression) Generate( UnresolvedGenerator(visitFunctionName(ctx.qualifiedName), expressions), - join = true, + requiredChildOutput = query.references.toSeq, --- End diff -- maybe `query.output`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158717147 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -444,12 +444,14 @@ object ColumnPruning extends Rule[LogicalPlan] { f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) -case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty => +case g: Generate + if g.requiredChildOutput.isEmpty && (g.child.outputSet -- g.references).nonEmpty => g.copy(child = prunedChild(g.child, g.references)) -// Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => - p.copy(child = g.copy(join = false)) +// Sync Generate's requiredChildOutput with the actual needed outputs +case p @ Project(_, g: Generate) + if (AttributeSet(g.requiredChildOutput) -- (p.references--g.producedAttributes)).nonEmpty => + p.copy(child = g.copy(requiredChildOutput = (p.references--g.producedAttributes).toSeq)) --- End diff -- we can simplify this ``` case p @ Project(_, g: Generate) => val actualRequired = p.references -- g.producedAttributes if (AttributeSet(g.requiredChildOutput) == actualRequired) { p } else { p.copy(child = g.copy(requiredChildOutput = actualRequired.toSeq)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158716646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1151,7 +1152,8 @@ class Analyzer( // If join is false, we will convert it to true for getting from the child the missing // attributes that its child might have or could have. val missing = missingAttrs -- g.child.outputSet - g.copy(join = true, child = addMissingAttr(g.child, missing)) + g.copy(requiredChildOutput = g.child.outputSet.toSeq, --- End diff -- nit `requiredChildOutput = g.child.output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158716569 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -846,12 +846,13 @@ class Analyzer( // ResolveReferences. Attributes in the output will be resolved by ResolveGenerate. case g @ Generate(generator, _, _, _, _, _) if generator.resolved => g - case g @ Generate(generator, join, outer, qualifier, output, child) => + case g @ Generate(generator, requiredChildOutput, outer, qualifier, output, child) => val newG = resolveExpression(generator, child, throws = true) if (newG.fastEquals(generator)) { g } else { - Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child) + Generate(newG.asInstanceOf[Generator], child.output, outer, --- End diff -- we should reserve `requiredChildOutput` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158716496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -695,7 +695,7 @@ class Analyzer( (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) case oldVersion: Generate -if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty => +if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => --- End diff -- Is this correct? `AttributeSet.intersect` is special. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158716207 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -115,7 +113,8 @@ case class Generate( } def output: Seq[Attribute] = { -if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput +if (requiredChildOutput.nonEmpty) child.output ++ qualifiedGeneratorOutput --- End diff -- This should be `child.output ++ requiredChildOutput` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158552882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorReferences` for Generate if its child column is not used +case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) + if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => --- End diff -- sounds reasonable. let me try that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158500070 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorReferences` for Generate if its child column is not used +case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) + if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => --- End diff -- Actually we can do better, the project may only need part of the generator references. How about we add a new parameter `requiredChildOutputs: Seq[Attribute]` to replace the `join` and `omitGeneratorReferences`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158499618 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorReferences` for Generate if its child column is not used +case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) + if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => --- End diff -- yea, this should be ``` case p @ Project(_, g @ Generate(generator, join @ true, _, omitGeneratorRefs @ false, _, _, _)) if p.references.intersect(generator.references).isEmpty => ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158488942 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -451,6 +451,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorReferences` for Generate if its child column is not used +case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) + if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => --- End diff -- @cloud-fan should I also change here from gu.child to gu.references? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436989 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -474,10 +474,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.GlobalLimitExec(limit, planLater(child)) :: Nil case logical.Union(unionChildren) => execution.UnionExec(unionChildren.map(planLater)) :: Nil - case g @ logical.Generate(generator, join, outer, _, _, child) => + case g @ logical.Generate(generator, join, outer, omitGeneratorChild, _, _, child) => execution.GenerateExec( - generator, join = join, outer = outer, g.qualifiedGeneratorOutput, - planLater(child)) :: Nil + generator, join = join, outer = outer, omitGeneratorChild = omitGeneratorChild, --- End diff -- `omitGeneratorChild = omitGeneratorChild` looks redundant, we can just pass `omitGeneratorChild` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436905 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +227,31 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val BASE = 1234567890 +val N = 4 + +val spark = sparkSession +import spark.implicits._ +import org.apache.spark.sql.functions._ + +val df = sparkSession.sparkContext.parallelize( + List(("1234567890", (BASE to (BASE + N)).map( +x => (x.toString, (x + 1).toString, (x + 2).toString, (x + 3).toString)).toArray))) + .toDF("c1", "c_arr") + .select(col("c1"), explode(col("c_arr"))) + .selectExpr("c1", "col.*") + +df.cache.count --- End diff -- can you follow the existing benchmark style in this file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436824 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +227,31 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val BASE = 1234567890 --- End diff -- why need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -86,10 +96,24 @@ case class GenerateExec( child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) val rows = if (join) { + +lazy val project = UnsafeProjection.create( + projectedChildOutput, + child.output, + subexpressionEliminationEnabled) + val joinedRow = new JoinedRow iter.flatMap { row => + + val projectedRow = if (omitGeneratorChild) { +project.initialize(index) +project(row) + } else { +row + } + // we should always set the left (child output) - joinedRow.withLeft(row) + joinedRow.withLeft(projectedRow) --- End diff -- here just `jonedRow.withLeft(pruneChildForResult(row))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436751 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -86,10 +96,24 @@ case class GenerateExec( child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) val rows = if (join) { + +lazy val project = UnsafeProjection.create( + projectedChildOutput, + child.output, + subexpressionEliminationEnabled) --- End diff -- we can simplify the code and centralize all the logic here ``` val pruneChildForResult: InternalRow => InternalRow = if (omitGeneratorReferences) { val generatorReferences = generator.references val requiredChildOutput = child.output.filterNot(generatorReferences.contains) UnsafeProjection.create(requiredChildOutput, child.output) } else { identity } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436519 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -86,10 +96,24 @@ case class GenerateExec( child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) val rows = if (join) { + +lazy val project = UnsafeProjection.create( + projectedChildOutput, + child.output, + subexpressionEliminationEnabled) --- End diff -- no need to set this. It's a very simple projection that only deal with attributes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158436309 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -59,13 +61,21 @@ case class GenerateExec( generator: Generator, join: Boolean, outer: Boolean, +omitGeneratorChild: Boolean, generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryExecNode with CodegenSupport { + private def projectedChildOutput = generator match { +case g: UnaryExpression if omitGeneratorChild => --- End diff -- why limit to `UnaryExpression`? Think about if we have an array concat function in the future, and when we do `explode(array_concat(col1, col2))`, we should be able to omit both `col1` and `col2`. I'd like to add a `omitGeneratorReferences` parameter, and here can be simplified to ``` private def requiredChildOutput = if (omitGeneratorReferences) { val generatorReferences = generator.references child.output.filterNot(generatorReferences.contains) } else { child.output } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r153998263 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorChild` for Generate if it's child column is not used +case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) + if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => --- End diff -- doesn't compile: Type mismatch, expected: NamedExpression, actual: Expression --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r153944758 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorChild` for Generate if it's child column is not used +case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) + if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => --- End diff -- `p.references.contains(gu.child)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r153944787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) +// Turn on `omitGeneratorChild` for Generate if it's child column is not used --- End diff -- nit: its --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r153945541 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -59,13 +61,21 @@ case class GenerateExec( generator: Generator, join: Boolean, outer: Boolean, +omitGeneratorChild: Boolean, generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryExecNode with CodegenSupport { + private def projectedChildOutput = generator match { +case g: UnaryExpression if omitGeneratorChild => + child.output diff Seq(g.child) --- End diff -- nit: `.diff(...)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r151868042 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -59,15 +61,23 @@ case class GenerateExec( generator: Generator, join: Boolean, outer: Boolean, +omitGeneratorChild: Boolean, generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryExecNode with CodegenSupport { + private def projectedChildOutput = generator match { +case g: UnaryExpression if omitGeneratorChild => + (child.output diff Seq(g.child)) +case _ => + child.output + } + override def output: Seq[Attribute] = { if (join) { - child.output ++ generatorOutput -} else { - generatorOutput + projectedChildOutput ++ generatorOutput + } else { --- End diff -- fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r151855849 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -59,15 +61,23 @@ case class GenerateExec( generator: Generator, join: Boolean, outer: Boolean, +omitGeneratorChild: Boolean, generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryExecNode with CodegenSupport { + private def projectedChildOutput = generator match { +case g: UnaryExpression if omitGeneratorChild => + (child.output diff Seq(g.child)) +case _ => + child.output + } + override def output: Seq[Attribute] = { if (join) { - child.output ++ generatorOutput -} else { - generatorOutput + projectedChildOutput ++ generatorOutput + } else { --- End diff -- nit: do we need update indentation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
GitHub user uzadude opened a pull request: https://github.com/apache/spark/pull/19683 [SPARK-21657][SQL] optimize explode quadratic memory consumpation ## What changes were proposed in this pull request? The issue has been raised in two Jira tickets: [SPARK-21657](https://issues.apache.org/jira/browse/SPARK-21657), [SPARK-16998](https://issues.apache.org/jira/browse/SPARK-16998). Basically, what happens is that in collection generators like explode/inline we create many rows from each row. Currently each exploded row contains also the column on which it was created. This causes, for example, if we have a 10k array in one row that this array will get copy 10k times - to each of the row. this results a qudratic memory consumption. However, it is a common case that the original column gets projected out after the explode, so we can avoid duplicating it. In this solution we propose to identify this situation in the optimizer and turn on a flag for omitting the original column in the generation process. ## How was this patch tested? 1. We added a benchmark test to MiscBenchmark that shows x16 improvement in runtimes. 2. We ran some of the other tests in MiscBenchmark and they show 15% improvements. 3. We ran this code on a specific case from our production data with rows containing arrays of size ~200k and it reduced the runtime from 6 hours to 3 mins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uzadude/spark optimize_explode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19683.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19683 commit ce7c3694a99584348957dc756234bb667466be4e Author: oravivDate: 2017-11-07T11:34:21Z [SPARK-21657][SQL] optimize explode quadratic memory consumpation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org