viirya commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1805264699
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
private val iteratorNotEmpty: Boolean = super.hasNext
override def completion(): Unit = {
- val isLeftSemiWithMatch =
- joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
- // Add to state store only if both removal predicates do not match,
- // and the row is not matched for left side of left semi join.
- val shouldAddToState =
- !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow) &&
- !isLeftSemiWithMatch
+ // The criteria of whether the input has to be added into state store
or not:
+ // - Left side: input can be skipped to be added to the state store if
it's already matched
+ // and the join type is left semi.
+ // For other cases, the input should be added, including the case
it's going to be evicted
+ // in this batch. It hasn't yet evaluated with inputs from right
side for this batch.
+ // Refer to the classdoc of SteramingSymmetricHashJoinExec about how
stream-stream join
+ // works.
+ // - Right side: for this side, the evaluation with inputs from left
side for this batch
+ // is done at this point. That said, input can be skipped to be
added to the state store
+ // if input is going to be evicted in this batch. Though, input
should be added to the
+ // state store if it's right outer join or full outer join, as
unmatched output is
+ // handled during state eviction.
+ val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide
&& iteratorNotEmpty
+ val shouldAddToState = if (isLeftSemiWithMatch) {
+ false
+ } else if (joinSide == LeftSide) {
+ true
+ } else {
+ // joinSide == RightSide
+
+ // if the input is not evicted in this batch (hence need to be
persisted)
+ val isNotEvictingInThisBatch =
+ !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow)
+
+ isNotEvictingInThisBatch ||
Review Comment:
Hm, why `isNotEvictingInThisBatch` is limited to right side join instead of
both before? If the reason that it is removed from left side is because
> missing to add the input to state store in left side prevents the input on
the right side to match with "that" input. Even though the input is going to be
evicted in this batch, there could be still inputs on the right side in this
batch which can match with that input.
Isn't the reason also applied for right side too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]