neilramaswamy commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1513508407


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -198,31 +198,50 @@ object StreamingSymmetricHashJoinHelper extends Logging {
     val joinKeyOrdinalForWatermark: Option[Int] = 
findJoinKeyOrdinalForWatermark(
       leftKeys, rightKeys)
 
+    // Returns a predicate that drops data less than the state watermark.
     def getOneSideStateWatermarkPredicate(
-        oneSideInputAttributes: Seq[Attribute],
-        oneSideJoinKeys: Seq[Expression],
-        otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
-      val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+        stateRemovalSideAttributes: Seq[Attribute],
+        stateRemovalSideJoinKeys: Seq[Expression],
+        watermarkSideAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+      val watermarkAttribute = 
watermarkSideAttributes.find(_.metadata.contains(delayKey))
+      val isWatermarkDefinedOnInput = watermarkAttribute.isDefined

Review Comment:
   Maybe I don't understand the question, but this is the _core_ logic that was 
wrong. We needed to change the logic from checking whether the watermark 
existing on `oneSideInputAttributes` (now called `stateRemovalSideAttributes`) 
to check whether it exists on `otherSideInputAttributes` (now called 
`watermarkSideAttributes`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to