viirya commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1805264699


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
       private val iteratorNotEmpty: Boolean = super.hasNext
 
       override def completion(): Unit = {
-        val isLeftSemiWithMatch =
-          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
-        // Add to state store only if both removal predicates do not match,
-        // and the row is not matched for left side of left semi join.
-        val shouldAddToState =
-          !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
-          !isLeftSemiWithMatch
+        // The criteria of whether the input has to be added into state store 
or not:
+        // - Left side: input can be skipped to be added to the state store if 
it's already matched
+        //   and the join type is left semi.
+        //   For other cases, the input should be added, including the case 
it's going to be evicted
+        //   in this batch. It hasn't yet evaluated with inputs from right 
side for this batch.
+        //   Refer to the classdoc of SteramingSymmetricHashJoinExec about how 
stream-stream join
+        //   works.
+        // - Right side: for this side, the evaluation with inputs from left 
side for this batch
+        //   is done at this point. That said, input can be skipped to be 
added to the state store
+        //   if input is going to be evicted in this batch. Though, input 
should be added to the
+        //   state store if it's right outer join or full outer join, as 
unmatched output is
+        //   handled during state eviction.
+        val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide 
&& iteratorNotEmpty
+        val shouldAddToState = if (isLeftSemiWithMatch) {
+          false
+        } else if (joinSide == LeftSide) {
+          true
+        } else {
+          // joinSide == RightSide
+
+          // if the input is not evicted in this batch (hence need to be 
persisted)
+          val isNotEvictingInThisBatch =
+            !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow)
+
+          isNotEvictingInThisBatch ||

Review Comment:
   Hm, why `isNotEvictingInThisBatch` is limited to right side join instead of 
both before? If the reason that it is removed from left side is because 
   
   > missing to add the input to state store in left side prevents the input on 
the right side to match with "that" input. Even though the input is going to be 
evicted in this batch, there could be still inputs on the right side in this 
batch which can match with that input.
   
   Isn't the reason also applied for right side too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to