anishshri-db commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840149856



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -360,10 +374,25 @@ case class StreamingSymmetricHashJoinExec(
             case 2 => kv.matched
             case _ => throwBadStateFormatVersionException()
           }
-        val leftSideOutputIter = leftSideJoiner.removeOldState().filterNot(
-          isKeyToValuePairMatched).map(pair => 
joinedRow.withLeft(pair.value).withRight(nullRight))
-        val rightSideOutputIter = rightSideJoiner.removeOldState().filterNot(
-          isKeyToValuePairMatched).map(pair => 
joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+        val leftSideInitIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot(isKeyToValuePairMatched)
+            .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
+        val rightSideInitIterFn = { () =>
+          val removedRowIter = rightSideJoiner.removeOldState()
+          removedRowIter.filterNot(isKeyToValuePairMatched)
+            .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+        }
+
+        // NOTE: we need to make sure both `leftSideOutputIter` and 
`rightSideOutputIter` are
+        // evaluated "after" exhausting all of elements in 
`hashJoinOutputIter`, otherwise it may
+        // lead to out of sync according to the interface contract on 
StateStore.iterator and
+        // end up with correctness issue. Please refer SPARK-38684 for more 
details.

Review comment:
       Should we add the comment here as well ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to