JoshRosen commented on code in PR #43938:
URL: https://github.com/apache/spark/pull/43938#discussion_r1403707057
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala:
##########
@@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
UnsafeProjection.create(streamedBoundKeys)
@transient protected[this] lazy val boundCondition = if
(condition.isDefined) {
- if (joinType == FullOuter && buildSide == BuildLeft) {
- // Put join left side before right side. This is to be consistent with
- // `ShuffledHashJoinExec.fullOuterJoin`.
+ if ((joinType == FullOuter || joinType == LeftOuter) && buildSide ==
BuildLeft) {
+ // Put join left side before right side. This is to be consistent with
ShuffledHashJoinExec.
Review Comment:
Ah, I think I understand now:
In all of the `*Join` methods in this file (which _doesn't_ include full
outer join), the streamed side is always the left side of the `joinRow` that is
passed to `boundCondition`: in `HashJoin.scala`, the input to `boundCondition`
is not necessarily the same as the output of the join operator itself (for
that, we have `resultProj` which comes from `createResultProjection` which
remains in sync with `boundCondition`
([source](https://github.com/apache/spark/blob/aa81f424aee2db9d85f1ab6720a519b9552cc7a7/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L151-L159)).
However, `boundCondition` is inherited in `ShuffledHashJoinExec` and there
we have a `buildSideOrFullOuterJoin` method
([source](https://github.com/apache/spark/blob/aa81f424aee2db9d85f1ab6720a519b9552cc7a7/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala#L119-L158))
and there the `resultProjection` operates over the same input schema as the
join output schema (rather than assuming that the joined row always has the
streamed side on the left). In that code, `boundCondition` is evaluated over an
input that matches the join output schema and there the streamed side could be
on either side rather than only the left.
I think this inheritance is confusing and hard to reason about.
It seems like `HashJoin.scala` has an invariant of "streamed side always on
left" which gets violated in `ShuffleHashJoin.scala`'s separate implementation
of outer joins.
I wonder whether we can address this bug by modifying
`ShuffledHashJoinExec.buildSideOrFullOuterJoin ` so that it always
unconditionally places the streamed side on the left (the same as
`HashJoin.scala`'s default).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]