jerrypeng commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1116601274


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -699,4 +703,18 @@ case class StreamingSymmetricHashJoinExec(
     } else {
       Nil
     }
+
+  // This operator will evict based on the state watermark on both side of 
inputs; we would like
+  // to let users leverage both sides of event time column for output of join, 
so the watermark
+  // must be lower bound of both sides of event time column. The lower bound 
of event time column
+  // for each side is determined by state watermark, hence we take a minimum 
of (left state
+  // watermark, right state watermark, input watermark) to decide the output 
watermark.
+  override def produceOutputWatermark(inputWatermarkMs: Long): Long = {

Review Comment:
   I am a little confused on what the input watermark for join is suppose to 
be.  I would expect there would be two input watermarks from both sides?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to