neilramaswamy commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1513506350


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -198,31 +198,50 @@ object StreamingSymmetricHashJoinHelper extends Logging {
     val joinKeyOrdinalForWatermark: Option[Int] = 
findJoinKeyOrdinalForWatermark(
       leftKeys, rightKeys)
 
+    // Returns a predicate that drops data less than the state watermark.
     def getOneSideStateWatermarkPredicate(
-        oneSideInputAttributes: Seq[Attribute],
-        oneSideJoinKeys: Seq[Expression],
-        otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
-      val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+        stateRemovalSideAttributes: Seq[Attribute],

Review Comment:
   They're not strictly needed, but a possible reason why this mistake was made 
in the first place was that you're working with "one side" and "other side", 
which means nothing if you don't have code context.
   
   I will definitely address in a follow-up since these names are very 
confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to