Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21253#discussion_r188703005
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -187,6 +187,12 @@ case class StreamingSymmetricHashJoinExec(
s"${getClass.getSimpleName} should not take $x as the JoinType")
}
+ override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata):
Boolean = {
+ (stateWatermarkPredicates.left.nonEmpty ||
stateWatermarkPredicates.right.nonEmpty) &&
+ eventTimeWatermark.isDefined &&
--- End diff --
nit: we should clearly document that this is the watermark before the
current batch
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]