jerrypeng commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1116601274
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -699,4 +703,18 @@ case class StreamingSymmetricHashJoinExec(
} else {
Nil
}
+
+ // This operator will evict based on the state watermark on both side of
inputs; we would like
+ // to let users leverage both sides of event time column for output of join,
so the watermark
+ // must be lower bound of both sides of event time column. The lower bound
of event time column
+ // for each side is determined by state watermark, hence we take a minimum
of (left state
+ // watermark, right state watermark, input watermark) to decide the output
watermark.
+ override def produceOutputWatermark(inputWatermarkMs: Long): Long = {
Review Comment:
I am a little confused on what the input watermark for join is suppose to
be. I would expect there would be two input watermarks from both sides?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]