Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/21701#discussion_r201071243
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
---
@@ -20,15 +20,66 @@ package org.apache.spark.sql.execution.streaming
import scala.collection.mutable
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.internal.SQLConf
-class WatermarkTracker extends Logging {
+/**
+ * Policy to define how to choose a new global watermark value if there are
+ * multiple watermark operators in a streaming query.
+ */
+sealed trait MultipleWatermarkPolicy {
+ def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long
+}
+
+object MultipleWatermarkPolicy {
+ val DEFAULT_POLICY_NAME = "min"
+
+ def apply(policyName: String): MultipleWatermarkPolicy = {
+ policyName.toLowerCase match {
+ case DEFAULT_POLICY_NAME => MinWatermark
+ case "max" => MaxWatermark
+ case _ =>
+ throw new IllegalArgumentException(s"Could not recognize watermark
policy '$policyName'")
+ }
+ }
+}
+
+/**
+ * Policy to choose the *min* of the operator watermark values as the
global watermark value.
+ * Note that this is the safe (hence default) policy as the global
watermark will advance
+ * only if all the individual operator watermarks have advanced. In other
words, in a
+ * streaming query with multiple input streams and watermarks defined on
all of them,
+ * the global watermark will advance as slowly as the slowest input. So if
there is watermark
+ * based state cleanup or late-data dropping, then this policy is the most
conservative one.
+ */
+case object MinWatermark extends MultipleWatermarkPolicy {
+ def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = {
+ if (operatorWatermarks.nonEmpty) operatorWatermarks.min else 0
--- End diff --
I took a while to figure out how 0 works and then realized
`operatorWatermarks ` is always not empty. Should we add an assertion rather
than returning `0`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]