HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1107127363
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -699,4 +701,18 @@ case class StreamingSymmetricHashJoinExec(
} else {
Nil
}
+
+ // This operator will evict based on the state watermark on both side of
inputs; we would like
+ // to let users leverage both sides of event time column for output of join,
so the watermark
+ // must be lower bound of both sides of event time column. The lower bound
of event time column
+ // for each side is determined by state watermark, hence we take a minimum
of (left state
+ // watermark, right state watermark, input watermark) to decide the output
watermark.
+ override def produceWatermark(inputWatermarkMs: Long): Long = {
Review Comment:
That's the minimum of the output watermark (input watermark for this
operator) in upstream operators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]