peter-toth commented on a change in pull request #26044: [SPARK-29375][SQL]
Exchange reuse across all subquery levels
URL: https://github.com/apache/spark/pull/26044#discussion_r349181968
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -93,11 +93,13 @@ case class HashAggregateExec(
// This is for testing. We force TungstenAggregationIterator to fall back to
the unsafe row hash
// map and/or the sort-based aggregation once it has processed a given
number of input rows.
private val testFallbackStartsAt: Option[(Int, Int)] = {
- sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt",
null) match {
- case null | "" => None
- case fallbackStartsAt =>
- val splits = fallbackStartsAt.split(",").map(_.trim)
- Some((splits.head.toInt, splits.last.toInt))
+ Option(sqlContext).flatMap {
Review comment:
This change was required because this UT error:
```
org.scalatest.exceptions.TestFailedException:
udf/postgreSQL/udf-aggregates_part3.sql - Scala UDF Expected
"struct<[col:bigint]>", but got "struct<[]>" Schema did not match for query #1
select udf((select udf(count(*)) from (values (1)) t0(inner_c))) as col
from (values (2),(3)) t1(outer_c): QueryOutput(select udf((select udf(count(*))
from (values (1)) t0(inner_c))) as col from (values (2),(3))
t1(outer_c),struct<>,java.lang.NullPointerException null)
```
where the stacktrace of the executor is:
```
02:43:13.445 ERROR org.apache.spark.executor.Executor: Exception in task 0.0
in stage 8.0 (TID 10)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy,
tree:
HashAggregate(keys=[], functions=[partial_count(1)], output=[count#397L])
+- Project
+- LocalTableScan <empty>, [col1#385]
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435)
at
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:424)
at
org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:132)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:261)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:772)
at
org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:742)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
at
org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:109)
at
org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:108)
at
org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:62)
at
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$canonicalized$1(Expression.scala:229)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.expressions.Expression.canonicalized$lzycompute(Expression.scala:229)
at
org.apache.spark.sql.catalyst.expressions.Expression.canonicalized(Expression.scala:228)
at
org.apache.spark.sql.catalyst.expressions.Expression.semanticHash(Expression.scala:248)
at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.hashCode(EquivalentExpressions.scala:41)
at scala.runtime.Statics.anyHash(Statics.java:122)
at
scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
at
scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136)
at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.get(HashMap.scala:74)
at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:55)
at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:99)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1(CodeGenerator.scala:1118)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1$adapted(CodeGenerator.scala:1118)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1118)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1170)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:64)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:49)
at
org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:84)
at
org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:80)
at
org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:47)
at
org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:95)
at
org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:103)
at
org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:471)
at
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:116)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor41.newInstance(Unknown
Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:468)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:467)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 151 more
Caused by: java.lang.NullPointerException
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:96)
... 158 more
```
Because this PR adds `lazy val canonicalized` to `ScalarSubquery`,
`EvalPythonExec` invoked canonicalization of `HashAggregateExec` on an executor
where SparkSession is not available.
Honestly I'm not sure how many other `SparkPlan` nodes exist that can't be
canonocalized on an executor for similar reasons.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]