HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1116629491
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -699,4 +703,18 @@ case class StreamingSymmetricHashJoinExec(
} else {
Nil
}
+
+ // This operator will evict based on the state watermark on both side of
inputs; we would like
+ // to let users leverage both sides of event time column for output of join,
so the watermark
+ // must be lower bound of both sides of event time column. The lower bound
of event time column
+ // for each side is determined by state watermark, hence we take a minimum
of (left state
+ // watermark, right state watermark, input watermark) to decide the output
watermark.
+ override def produceOutputWatermark(inputWatermarkMs: Long): Long = {
Review Comment:
We provide a single input watermark value via min input watermark of all
upstreams. We don't try to be very tight on this - we simply use global
watermark as origin watermark rather than individual watermark node as origin
watermark, which is already very loose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]