jerrypeng commented on code in PR #39931: URL: https://github.com/apache/spark/pull/39931#discussion_r1116601274
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ########## @@ -699,4 +703,18 @@ case class StreamingSymmetricHashJoinExec( } else { Nil } + + // This operator will evict based on the state watermark on both side of inputs; we would like + // to let users leverage both sides of event time column for output of join, so the watermark + // must be lower bound of both sides of event time column. The lower bound of event time column + // for each side is determined by state watermark, hence we take a minimum of (left state + // watermark, right state watermark, input watermark) to decide the output watermark. + override def produceOutputWatermark(inputWatermarkMs: Long): Long = { Review Comment: I am a little confused on what the input watermark for join is suppose to be. I would expect there would be two input watermarks from both sides? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org