viirya commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1805276988


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
       private val iteratorNotEmpty: Boolean = super.hasNext
 
       override def completion(): Unit = {
-        val isLeftSemiWithMatch =
-          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
-        // Add to state store only if both removal predicates do not match,
-        // and the row is not matched for left side of left semi join.
-        val shouldAddToState =
-          !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
-          !isLeftSemiWithMatch
+        // The criteria of whether the input has to be added into state store 
or not:
+        // - Left side: input can be skipped to be added to the state store if 
it's already matched
+        //   and the join type is left semi.
+        //   For other cases, the input should be added, including the case 
it's going to be evicted
+        //   in this batch. It hasn't yet evaluated with inputs from right 
side for this batch.
+        //   Refer to the classdoc of SteramingSymmetricHashJoinExec about how 
stream-stream join
+        //   works.
+        // - Right side: for this side, the evaluation with inputs from left 
side for this batch
+        //   is done at this point. That said, input can be skipped to be 
added to the state store
+        //   if input is going to be evicted in this batch. Though, input 
should be added to the
+        //   state store if it's right outer join or full outer join, as 
unmatched output is
+        //   handled during state eviction.
+        val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide 
&& iteratorNotEmpty
+        val shouldAddToState = if (isLeftSemiWithMatch) {
+          false
+        } else if (joinSide == LeftSide) {
+          true
+        } else {
+          // joinSide == RightSide
+
+          // if the input is not evicted in this batch (hence need to be 
persisted)
+          val isNotEvictingInThisBatch =
+            !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow)
+
+          isNotEvictingInThisBatch ||

Review Comment:
   Oh, nvm, rightSideJoiner already joined new right input with new left input 
which is just stored (that is wrongly NOT stored previously).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to