neilramaswamy commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1567467392
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
attributesWithEventWatermark =
AttributeSet(otherSideInputAttributes),
condition,
eventTimeWatermarkForEviction)
- val inputAttributeWithWatermark =
oneSideInputAttributes.find(_.metadata.contains(delayKey))
Review Comment:
The code previously made the over-simplified assumption that there would be
a watermark on one side, the state removal side. It would then use this
assumption to find the attribute for state removal.
This assumption is not right. If you have the expression `L > R + 10` and
`R` has a watermark, how do you find `L`? The approach I have chosen to take is
taking the attributes (`attributesInCondition`), filtering for the attributes
on the removal side (`oneSideInputAttributes`), and then taking that result.
That would leave us with `R`.
Effectively, this line is equivalent to
`oneSideStateWatermarkAttributes.head`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]