c21 commented on a change in pull request #29277:
URL: https://github.com/apache/spark/pull/29277#discussion_r463165676
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
##########
@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child))))
+ case j: ShuffledHashJoinExec =>
+ // The children of ShuffledHashJoin should do codegen separately.
+ j.withNewChildren(j.children.map(
Review comment:
@viirya - I don't think we can remove this. We have to do shuffled hash
join codegen separately, as we have [a hardcoded dependency for build side
input `input[1]` when building
relation](https://github.com/apache/spark/pull/29277/files#diff-db4ffe4f0196a9d7cf1f04c350ee3381R90).
This can go wrong if we have multiple shuffled hash join in one query.
E.g.
```
test("ShuffledHashJoin should be included in WholeStageCodegen") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(5).select($"id".as("k1"))
val df2 = spark.range(15).select($"id".as("k2"))
val df3 = spark.range(6).select($"id".as("k3"))
val twoJoinsDF = df1.join(df2, $"k1" === $"k2").join(df3, $"k1" ===
$"k3")
}
}
```
If we don't codegen shuffled hash join children separately, we will get
something like:
```
/* 018 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 019 */ partitionIndex = index;
/* 020 */ this.inputs = inputs;
/* 021 */ inputadapter_input_0 = inputs[0];
/* 022 */ shj_relation_0 =
((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /*
plan */).buildHashedRelation(inputs[1]);
/* 023 */ shj_mutableStateArray_0[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 024 */ shj_relation_1 =
((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[2] /*
plan */).buildHashedRelation(inputs[1]);
/* 025 */ shj_mutableStateArray_0[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 026 */
/* 027 */ }
```
`shj_relation_0` and `shj_relation_1` will try to build hash relation on
same input (but shouldn't), as the `input[1]` is hardcoded there. On the other
hand, I couldn't think of an alternative way not to hardcode `input[1]` here in
codegen. Let me know if you have any better options. Thanks. I also updated
`WholeStageCodegenSuite.scala` to have a unit test for this kind of multiple
joins query.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]