JoshRosen commented on code in PR #43938:
URL: https://github.com/apache/spark/pull/43938#discussion_r1403707057


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala:
##########
@@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
     UnsafeProjection.create(streamedBoundKeys)
 
   @transient protected[this] lazy val boundCondition = if 
(condition.isDefined) {
-    if (joinType == FullOuter && buildSide == BuildLeft) {
-      // Put join left side before right side. This is to be consistent with
-      // `ShuffledHashJoinExec.fullOuterJoin`.
+    if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == 
BuildLeft) {
+      // Put join left side before right side. This is to be consistent with 
ShuffledHashJoinExec.

Review Comment:
   Ah, I think I understand now:
   
   In all of the `*Join` methods in this file (which _doesn't_ include full 
outer join), the streamed side is always the left side of the `joinRow` that is 
passed to `boundCondition`: in `HashJoin.scala`, the input to `boundCondition` 
is not necessarily the same as the output of the join operator itself (for 
that, we have `resultProj` which comes from `createResultProjection` which 
remains in sync with `boundCondition` 
([source](https://github.com/apache/spark/blob/aa81f424aee2db9d85f1ab6720a519b9552cc7a7/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L151-L159)).
   
   However, `boundCondition` is inherited in `ShuffledHashJoinExec` and there 
we have a `buildSideOrFullOuterJoin` method 
([source](https://github.com/apache/spark/blob/aa81f424aee2db9d85f1ab6720a519b9552cc7a7/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala#L119-L158))
 and there the `resultProjection` operates over the same input schema as the 
join output schema (rather than assuming that the joined row always has the 
streamed side on the left). In that code, `boundCondition` is evaluated over an 
input that matches the join output schema and there the streamed side could be 
on either side rather than only the left.
   
   I think this inheritance is confusing and hard to reason about. 
   
   It seems like `HashJoin.scala` has an invariant of "streamed side always on 
left" which gets violated in `ShuffleHashJoin.scala`'s separate implementation 
of outer joins.
   
   I wonder whether we can address this bug by modifying 
`ShuffledHashJoinExec.buildSideOrFullOuterJoin ` so that it always 
unconditionally places the streamed side on the left (the same as 
`HashJoin.scala`'s default). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to