neilramaswamy commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1513523534


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -198,18 +198,23 @@ object StreamingSymmetricHashJoinHelper extends Logging {
     val joinKeyOrdinalForWatermark: Option[Int] = 
findJoinKeyOrdinalForWatermark(
       leftKeys, rightKeys)
 
+    // Returns a predicate that drops data less than the state watermark.
+    // oneSideInputAttributes are the attributes to base the state watermark 
off of, while
+    // otherSideInputAttributes are the attributes on which the watermark is 
defined.
     def getOneSideStateWatermarkPredicate(
         oneSideInputAttributes: Seq[Attribute],
         oneSideJoinKeys: Seq[Expression],
         otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
-      val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+      val watermarkAttribute = 
otherSideInputAttributes.find(_.metadata.contains(delayKey))
+      val isWatermarkDefinedOnInput = watermarkAttribute.isDefined
       val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined
 
       if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 in the 
StreamingSymmetricHashJoinExec docs
         val keyExprWithWatermark = BoundReference(
           joinKeyOrdinalForWatermark.get,
           oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
-          oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+          oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable
+        )

Review Comment:
   ```suggestion
             oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to