c21 commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r463165676



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
##########
@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
         // The children of SortMergeJoin should do codegen separately.
         j.withNewChildren(j.children.map(
           child => InputAdapter(insertWholeStageCodegen(child))))
+      case j: ShuffledHashJoinExec =>
+        // The children of ShuffledHashJoin should do codegen separately.
+        j.withNewChildren(j.children.map(

Review comment:
       @viirya - I don't think we can remove this. We have to do shuffled hash 
join codegen separately, as we have [a hardcoded dependency for build side 
input `input[1]` when building 
relation](https://github.com/apache/spark/pull/29277/files#diff-db4ffe4f0196a9d7cf1f04c350ee3381R90).
 This can go wrong if we have multiple shuffled hash join in one query.
   
   E.g.
   ```
     test("ShuffledHashJoin should be included in WholeStageCodegen") {
       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30",
           SQLConf.SHUFFLE_PARTITIONS.key -> "2",
           SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
         val df1 = spark.range(5).select($"id".as("k1"))
         val df2 = spark.range(15).select($"id".as("k2"))
         val df3 = spark.range(6).select($"id".as("k3"))
         val twoJoinsDF = df1.join(df2, $"k1" === $"k2").join(df3, $"k1" === 
$"k3")
       }
     }
   ```
   
   If we don't codegen shuffled hash join children separately, we will get 
something like:
   
   ```
   /* 018 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 019 */     partitionIndex = index;
   /* 020 */     this.inputs = inputs;
   /* 021 */     inputadapter_input_0 = inputs[0];
   /* 022 */     shj_relation_0 = 
((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* 
plan */).buildHashedRelation(inputs[1]);
   /* 023 */     shj_mutableStateArray_0[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
   /* 024 */     shj_relation_1 = 
((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[2] /* 
plan */).buildHashedRelation(inputs[1]);
   /* 025 */     shj_mutableStateArray_0[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
   /* 026 */
   /* 027 */   }
   ```
   
   `shj_relation_0` and `shj_relation_1` will try to build hash relation on 
same input (but shouldn't), as the `input[1]` is hardcoded there. On the other 
hand, I couldn't think of an alternative way not to hardcode `input[1]` here in 
codegen. Let me know if you have any better options. Thanks. I also updated 
`WholeStageCodegenSuite.scala` to have a unit test for this kind of multiple 
joins query.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to