rangadi commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1566712758


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
           eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-        val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-        expr.map(JoinStateValueWatermarkPredicate.apply _)
 
+        // For example, if the condition is of the form:
+        //    left_time > right_time + INTERVAL 30 MINUTES
+        // Then this extracts left_time and right_time.
+        val attributesInCondition = AttributeSet(
+          condition.get.collect { case a: AttributeReference => a }
+        )
+
+        // Construct an AttributeSet so that we can perform equality between 
attributes,
+        // which we do in the filter below.
+        val oneSideInputAttributeSet = AttributeSet(oneSideInputAttributes)
+
+        // oneSideInputAttributes could be [left_value, left_time], and we just
+        // want the attribute _in_ the time-interval condition.
+        val oneSideStateWatermarkAttributes = attributesInCondition.filter { a 
=>
+            oneSideInputAttributeSet.contains(a)

Review Comment:
   What ensures this is the event-time attribute? Is this assured to be 
`left_time` mentioned in the comment?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
           eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-        val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-        expr.map(JoinStateValueWatermarkPredicate.apply _)
 
+        // For example, if the condition is of the form:
+        //    left_time > right_time + INTERVAL 30 MINUTES
+        // Then this extracts left_time and right_time.
+        val attributesInCondition = AttributeSet(
+          condition.get.collect { case a: AttributeReference => a }
+        )
+
+        // Construct an AttributeSet so that we can perform equality between 
attributes,
+        // which we do in the filter below.
+        val oneSideInputAttributeSet = AttributeSet(oneSideInputAttributes)
+
+        // oneSideInputAttributes could be [left_value, left_time], and we just
+        // want the attribute _in_ the time-interval condition.
+        val oneSideStateWatermarkAttributes = attributesInCondition.filter { a 
=>
+            oneSideInputAttributeSet.contains(a)
+        }
+
+        // There should be a single attribute per side in the time-interval 
condition, so,

Review Comment:
   Where is this part ensured? ('single attribute per side').



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
           eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))

Review Comment:
   Where is the equivalent of this statement? This looks like the event-time 
attribute from one side (left side in examples).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
           eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-        val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-        expr.map(JoinStateValueWatermarkPredicate.apply _)
 
+        // For example, if the condition is of the form:
+        //    left_time > right_time + INTERVAL 30 MINUTES
+        // Then this extracts left_time and right_time.
+        val attributesInCondition = AttributeSet(
+          condition.get.collect { case a: AttributeReference => a }
+        )
+
+        // Construct an AttributeSet so that we can perform equality between 
attributes,
+        // which we do in the filter below.
+        val oneSideInputAttributeSet = AttributeSet(oneSideInputAttributes)
+
+        // oneSideInputAttributes could be [left_value, left_time], and we just
+        // want the attribute _in_ the time-interval condition.
+        val oneSideStateWatermarkAttributes = attributesInCondition.filter { a 
=>
+            oneSideInputAttributeSet.contains(a)
+        }
+
+        // There should be a single attribute per side in the time-interval 
condition, so,
+        // filtering for oneSideInputAttributes as done above should lead us 
with 1 attribute.
+        if (oneSideStateWatermarkAttributes.size == 1) {
+          val expr = watermarkExpression(
+            Some(oneSideStateWatermarkAttributes.head), stateValueWatermark)
+          expr.map(JoinStateValueWatermarkPredicate.apply _)
+        } else {
+          // But, if there are more state watermark attributes, we can't solve 
the contraints,

Review Comment:
   An example would be useful. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to