viirya commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1805276988
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
private val iteratorNotEmpty: Boolean = super.hasNext
override def completion(): Unit = {
- val isLeftSemiWithMatch =
- joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
- // Add to state store only if both removal predicates do not match,
- // and the row is not matched for left side of left semi join.
- val shouldAddToState =
- !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow) &&
- !isLeftSemiWithMatch
+ // The criteria of whether the input has to be added into state store
or not:
+ // - Left side: input can be skipped to be added to the state store if
it's already matched
+ // and the join type is left semi.
+ // For other cases, the input should be added, including the case
it's going to be evicted
+ // in this batch. It hasn't yet evaluated with inputs from right
side for this batch.
+ // Refer to the classdoc of SteramingSymmetricHashJoinExec about how
stream-stream join
+ // works.
+ // - Right side: for this side, the evaluation with inputs from left
side for this batch
+ // is done at this point. That said, input can be skipped to be
added to the state store
+ // if input is going to be evicted in this batch. Though, input
should be added to the
+ // state store if it's right outer join or full outer join, as
unmatched output is
+ // handled during state eviction.
+ val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide
&& iteratorNotEmpty
+ val shouldAddToState = if (isLeftSemiWithMatch) {
+ false
+ } else if (joinSide == LeftSide) {
+ true
+ } else {
+ // joinSide == RightSide
+
+ // if the input is not evicted in this batch (hence need to be
persisted)
+ val isNotEvictingInThisBatch =
+ !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow)
+
+ isNotEvictingInThisBatch ||
Review Comment:
Oh, nvm, rightSideJoiner already joined new right input with new left input
which is just stored (that is wrongly NOT stored previously).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]