peter-toth commented on a change in pull request #26044: [SPARK-29375][SQL] 
Exchange reuse across all subquery levels
URL: https://github.com/apache/spark/pull/26044#discussion_r349181968
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ##########
 @@ -93,11 +93,13 @@ case class HashAggregateExec(
   // This is for testing. We force TungstenAggregationIterator to fall back to 
the unsafe row hash
   // map and/or the sort-based aggregation once it has processed a given 
number of input rows.
   private val testFallbackStartsAt: Option[(Int, Int)] = {
-    sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", 
null) match {
-      case null | "" => None
-      case fallbackStartsAt =>
-        val splits = fallbackStartsAt.split(",").map(_.trim)
-        Some((splits.head.toInt, splits.last.toInt))
+    Option(sqlContext).flatMap {
 
 Review comment:
   This change was required because this UT error:
   ```
   org.scalatest.exceptions.TestFailedException: 
udf/postgreSQL/udf-aggregates_part3.sql - Scala UDF Expected 
"struct<[col:bigint]>", but got "struct<[]>" Schema did not match for query #1 
select udf((select udf(count(*))         from (values (1)) t0(inner_c))) as col 
from (values (2),(3)) t1(outer_c): QueryOutput(select udf((select udf(count(*)) 
        from (values (1)) t0(inner_c))) as col from (values (2),(3)) 
t1(outer_c),struct<>,java.lang.NullPointerException null)
   ```
   where the stacktrace of the executor is:
   ```
   02:43:13.445 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 
in stage 8.0 (TID 10)
   org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
tree:
   HashAggregate(keys=[], functions=[partial_count(1)], output=[count#397L])
   +- Project
      +- LocalTableScan <empty>, [col1#385]
   
        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:424)
        at 
org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:132)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:261)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:772)
        at 
org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:742)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at 
org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:109)
        at 
org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:108)
        at 
org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:62)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$canonicalized$1(Expression.scala:229)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.canonicalized$lzycompute(Expression.scala:229)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.canonicalized(Expression.scala:228)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.semanticHash(Expression.scala:248)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.hashCode(EquivalentExpressions.scala:41)
        at scala.runtime.Statics.anyHash(Statics.java:122)
        at 
scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
        at 
scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
        at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
        at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136)
        at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
        at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.get(HashMap.scala:74)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:55)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:99)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1(CodeGenerator.scala:1118)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1$adapted(CodeGenerator.scala:1118)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1118)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1170)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:64)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:49)
        at 
org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:84)
        at 
org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:80)
        at 
org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:47)
        at 
org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:95)
        at 
org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:103)
        at 
org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:471)
        at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:116)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedConstructorAccessor41.newInstance(Unknown 
Source)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:468)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:467)
        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 151 more
   Caused by: java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:96)
        ... 158 more
   ```
   Because this PR adds `lazy val canonicalized` to `ScalarSubquery`, 
`EvalPythonExec` invoked canonicalization of `HashAggregateExec` on an executor 
where SparkSession is not available.
   Honestly I'm not sure how many other `SparkPlan` nodes exist that can't be 
canonocalized on an executor for similar reasons.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to