bersprockets opened a new pull request, #40569:
URL: https://github.com/apache/spark/pull/40569

   ### What changes were proposed in this pull request?
   
   Change `PlanSubqueries` to set `shouldBroadcast` to true when instantiating 
an `InSubqueryExec` instance.
   
   ### Why are the changes needed?
   
   The below left outer join gets an error:
   ```
   create or replace temp view v1 as
   select * from values
   (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
   (2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2),
   (3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
   as v1(key, value1, value2, value3, value4, value5, value6, value7, value8, 
value9, value10);
   
   create or replace temp view v2 as
   select * from values
   (1, 2),
   (3, 8),
   (7, 9)
   as v2(a, b);
   
   create or replace temp view v3 as
   select * from values
   (3),
   (8)
   as v3(col1);
   
   set spark.sql.codegen.maxFields=10; -- let's make maxFields 10 instead of 100
   set spark.sql.adaptive.enabled=false;
   
   select *
   from v1
   left outer join v2
   on key = a
   and key in (select col1 from v3);
   ```
   The join fails during predicate codegen:
   ```
   23/03/27 12:24:12 WARN Predicate: Expr codegen error and falling back to 
interpreter mode
   java.lang.IllegalArgumentException: requirement failed: input[0, int, false] 
IN subquery#34 has not finished
        at scala.Predef$.require(Predef.scala:281)
        at 
org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
        at 
org.apache.spark.sql.execution.InSubqueryExec.doGenCode(subquery.scala:156)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
        at scala.collection.immutable.List.map(List.scala:293)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:41)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.generate(GeneratePredicate.scala:33)
        at 
org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:73)
        at 
org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:70)
        at 
org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
        at 
org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:86)
        at 
org.apache.spark.sql.execution.joins.HashJoin.boundCondition(HashJoin.scala:146)
        at 
org.apache.spark.sql.execution.joins.HashJoin.boundCondition$(HashJoin.scala:140)
        at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition$lzycompute(BroadcastHashJoinExec.scala:40)
        at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition(BroadcastHashJoinExec.scala:40)
   ```
   It fails again after fallback to interpreter mode:
   ```
   23/03/27 12:24:12 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
   java.lang.IllegalArgumentException: requirement failed: input[0, int, false] 
IN subquery#34 has not finished
        at scala.Predef$.require(Predef.scala:281)
        at 
org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
        at 
org.apache.spark.sql.execution.InSubqueryExec.eval(subquery.scala:151)
        at 
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
        at 
org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2(HashJoin.scala:146)
        at 
org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2$adapted(HashJoin.scala:146)
        at 
org.apache.spark.sql.execution.joins.HashJoin.$anonfun$outerJoin$1(HashJoin.scala:205)
   ```
   Both the predicate codegen and the evaluation fail for the same reason: 
`PlanSubqueries` creates `InSubqueryExec` with `shouldBroadcast=false`. The 
driver waits for the subquery to finish, but it's the executor that uses the 
results of the subquery (for predicate codegen or evaluation). Because 
`shouldBroadcast` is set to false, the result is stored in a transient field 
(`InSubqueryExec#result`), so the result of the subquery is not serialized when 
the `InSubqueryExec` instance is sent to the executor.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New unit test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to