Repository: spark
Updated Branches:
  refs/heads/master 32cb50835 -> 6078b891d


[SPARK-24730][SS] Add policy to choose max as global watermark when streaming 
query has multiple watermarks

## What changes were proposed in this pull request?

Currently, when a streaming query has multiple watermark, the policy is to 
choose the min of them as the global watermark. This is safe to do as the 
global watermark moves with the slowest stream, and is therefore is safe as it 
does not unexpectedly drop some data as late, etc. While this is indeed the 
safe thing to do, in some cases, you may want the watermark to advance with the 
fastest stream, that is, take the max of multiple watermarks. This PR is to add 
that configuration. It makes the following changes.

- Adds a configuration to specify max as the policy.
- Saves the configuration in OffsetSeqMetadata because changing it in the 
middle can lead to unpredictable results.
   - For old checkpoints without the configuration, it assumes the default 
policy as min (irrespective of the policy set at the session where the query is 
being restarted). This is to ensure that existing queries are affected in any 
way.

TODO
- [ ] Add a test for recovery from existing checkpoints.

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #21701 from tdas/SPARK-24730.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6078b891
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6078b891
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6078b891

Branch: refs/heads/master
Commit: 6078b891da8fe7fc36579699473168ae7443284c
Parents: 32cb508
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Jul 10 18:03:40 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jul 10 18:03:40 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  15 ++
 .../streaming/MicroBatchExecution.scala         |   4 +-
 .../sql/execution/streaming/OffsetSeq.scala     |  37 ++++-
 .../execution/streaming/WatermarkTracker.scala  |  90 ++++++++++--
 .../commits/0                                   |   2 +
 .../commits/1                                   |   2 +
 .../metadata                                    |   1 +
 .../offsets/0                                   |   4 +
 .../offsets/1                                   |   4 +
 .../sql/streaming/EventTimeWatermarkSuite.scala | 136 ++++++++++++++++++-
 10 files changed, 276 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 50965c1..ae56cc9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -875,6 +875,21 @@ object SQLConf {
       .stringConf
       
.createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
 
+  val STREAMING_MULTIPLE_WATERMARK_POLICY =
+    buildConf("spark.sql.streaming.multipleWatermarkPolicy")
+      .doc("Policy to calculate the global watermark value when there are 
multiple watermark " +
+        "operators in a streaming query. The default value is 'min' which 
chooses " +
+        "the minimum watermark reported across multiple operators. Other 
alternative value is" +
+        "'max' which chooses the maximum across multiple operators." +
+        "Note: This configuration cannot be changed between query restarts 
from the same " +
+        "checkpoint location.")
+      .stringConf
+      .checkValue(
+        str => Set("min", "max").contains(str.toLowerCase),
+        "Invalid value for 'spark.sql.streaming.multipleWatermarkPolicy'. " +
+          "Valid values are 'min' and 'max'")
+      .createWithDefault("min") // must be same as 
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
+
   val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
     buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
       .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 17ffa2a..16651dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -61,7 +61,7 @@ class MicroBatchExecution(
     case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
   }
 
-  private val watermarkTracker = new WatermarkTracker()
+  private var watermarkTracker: WatermarkTracker = _
 
   override lazy val logicalPlan: LogicalPlan = {
     assert(queryExecutionThread eq Thread.currentThread,
@@ -257,6 +257,7 @@ class MicroBatchExecution(
           OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.conf)
           offsetSeqMetadata = OffsetSeqMetadata(
             metadata.batchWatermarkMs, metadata.batchTimestampMs, 
sparkSessionToRunBatches.conf)
+          watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
           watermarkTracker.setWatermark(metadata.batchWatermarkMs)
         }
 
@@ -295,6 +296,7 @@ class MicroBatchExecution(
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
+        watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 7871744..1ae3f36 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -22,7 +22,7 @@ import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.RuntimeConfig
-import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, 
STATE_STORE_PROVIDER_CLASS}
+import org.apache.spark.sql.internal.SQLConf._
 
 /**
  * An ordered collection of offsets, used to track the progress of processing 
data from one or more
@@ -86,7 +86,22 @@ case class OffsetSeqMetadata(
 
 object OffsetSeqMetadata extends Logging {
   private implicit val format = Serialization.formats(NoTypeHints)
-  private val relevantSQLConfs = Seq(SHUFFLE_PARTITIONS, 
STATE_STORE_PROVIDER_CLASS)
+  private val relevantSQLConfs = Seq(
+    SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, 
STREAMING_MULTIPLE_WATERMARK_POLICY)
+
+  /**
+   * Default values of relevant configurations that are used for backward 
compatibility.
+   * As new configurations are added to the metadata, existing checkpoints may 
not have those
+   * confs. The values in this list ensures that the confs without recovered 
values are
+   * set to a default value that ensure the same behavior of the streaming 
query as it was before
+   * the restart.
+   *
+   * Note, that this is optional; set values here if you *have* to override 
existing session conf
+   * with a specific default value for ensuring same behavior of the query as 
before.
+   */
+  private val relevantSQLConfDefaultValues = Map[String, String](
+    STREAMING_MULTIPLE_WATERMARK_POLICY.key -> 
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
+  )
 
   def apply(json: String): OffsetSeqMetadata = 
Serialization.read[OffsetSeqMetadata](json)
 
@@ -115,8 +130,22 @@ object OffsetSeqMetadata extends Logging {
 
         case None =>
           // For backward compatibility, if a config was not recorded in the 
offset log,
-          // then log it, and let the existing conf value in SparkSession 
prevail.
-          logWarning (s"Conf '$confKey' was not found in the offset log, using 
existing value")
+          // then either inject a default value (if specified in 
`relevantSQLConfDefaultValues`) or
+          // let the existing conf value in SparkSession prevail.
+          relevantSQLConfDefaultValues.get(confKey) match {
+
+            case Some(defaultValue) =>
+              sessionConf.set(confKey, defaultValue)
+              logWarning(s"Conf '$confKey' was not found in the offset log, " +
+                s"using default value '$defaultValue'")
+
+            case None =>
+              val valueStr = sessionConf.getOption(confKey).map { v =>
+                s" Using existing session conf value '$v'."
+              }.getOrElse { " No value set in session conf." }
+              logWarning(s"Conf '$confKey' was not found in the offset log. 
$valueStr")
+
+          }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
index 8086566..7b30db4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
@@ -20,15 +20,68 @@ package org.apache.spark.sql.execution.streaming
 import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.internal.SQLConf
 
-class WatermarkTracker extends Logging {
+/**
+ * Policy to define how to choose a new global watermark value if there are
+ * multiple watermark operators in a streaming query.
+ */
+sealed trait MultipleWatermarkPolicy {
+  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long
+}
+
+object MultipleWatermarkPolicy {
+  val DEFAULT_POLICY_NAME = "min"
+
+  def apply(policyName: String): MultipleWatermarkPolicy = {
+    policyName.toLowerCase match {
+      case DEFAULT_POLICY_NAME => MinWatermark
+      case "max" => MaxWatermark
+      case _ =>
+        throw new IllegalArgumentException(s"Could not recognize watermark 
policy '$policyName'")
+    }
+  }
+}
+
+/**
+ * Policy to choose the *min* of the operator watermark values as the global 
watermark value.
+ * Note that this is the safe (hence default) policy as the global watermark 
will advance
+ * only if all the individual operator watermarks have advanced. In other 
words, in a
+ * streaming query with multiple input streams and watermarks defined on all 
of them,
+ * the global watermark will advance as slowly as the slowest input. So if 
there is watermark
+ * based state cleanup or late-data dropping, then this policy is the most 
conservative one.
+ */
+case object MinWatermark extends MultipleWatermarkPolicy {
+  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = {
+    assert(operatorWatermarks.nonEmpty)
+    operatorWatermarks.min
+  }
+}
+
+/**
+ * Policy to choose the *min* of the operator watermark values as the global 
watermark value. So the
+ * global watermark will advance if any of the individual operator watermarks 
has advanced.
+ * In other words, in a streaming query with multiple input streams and 
watermarks defined on all
+ * of them, the global watermark will advance as fast as the fastest input. So 
if there is watermark
+ * based state cleanup or late-data dropping, then this policy is the most 
aggressive one and
+ * may lead to unexpected behavior if the data of the slow stream is delayed.
+ */
+case object MaxWatermark extends MultipleWatermarkPolicy {
+  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = {
+    assert(operatorWatermarks.nonEmpty)
+    operatorWatermarks.max
+  }
+}
+
+/** Tracks the watermark value of a streaming query based on a given `policy` 
*/
+case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging {
   private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
-  private var watermarkMs: Long = 0
-  private var updated = false
+  private var globalWatermarkMs: Long = 0
 
   def setWatermark(newWatermarkMs: Long): Unit = synchronized {
-    watermarkMs = newWatermarkMs
+    globalWatermarkMs = newWatermarkMs
   }
 
   def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
@@ -37,7 +90,6 @@ class WatermarkTracker extends Logging {
     }
     if (watermarkOperators.isEmpty) return
 
-
     watermarkOperators.zipWithIndex.foreach {
       case (e, index) if e.eventTimeStats.value.count > 0 =>
         logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
@@ -58,16 +110,28 @@ class WatermarkTracker extends Logging {
     // This is the safest option, because only the global watermark is 
fault-tolerant. Making
     // it the minimum of all individual watermarks guarantees it will never 
advance past where
     // any individual watermark operator would be if it were in a plan by 
itself.
-    val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2
-    if (newWatermarkMs > watermarkMs) {
-      logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-      watermarkMs = newWatermarkMs
-      updated = true
+    val chosenGlobalWatermark = 
policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
+    if (chosenGlobalWatermark > globalWatermarkMs) {
+      logInfo(s"Updating event-time watermark from $globalWatermarkMs to 
$chosenGlobalWatermark ms")
+      globalWatermarkMs = chosenGlobalWatermark
     } else {
-      logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs")
-      updated = false
+      logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < 
$globalWatermarkMs")
     }
   }
 
-  def currentWatermark: Long = synchronized { watermarkMs }
+  def currentWatermark: Long = synchronized { globalWatermarkMs }
+}
+
+object WatermarkTracker {
+  def apply(conf: RuntimeConfig): WatermarkTracker = {
+    // If the session has been explicitly configured to use non-default policy 
then use it,
+    // otherwise use the default `min` policy as thats the safe thing to do.
+    // When recovering from a checkpoint location, it is expected that the 
`conf` will already
+    // be configured with the value present in the checkpoint. If there is no 
policy explicitly
+    // saved in the checkpoint (e.g., old checkpoints), then the default `min` 
policy is enforced
+    // through defaults specified in OffsetSeqMetadata.setSessionConf().
+    val policyName = conf.get(
+      SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY, 
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME)
+    new WatermarkTracker(MultipleWatermarkPolicy(policyName))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/0
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/0
new file mode 100644
index 0000000..83321cd
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/0
@@ -0,0 +1,2 @@
+v1
+{}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/1
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/1
new file mode 100644
index 0000000..83321cd
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/commits/1
@@ -0,0 +1,2 @@
+v1
+{}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/metadata
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/metadata
new file mode 100644
index 0000000..d6be7fb
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/metadata
@@ -0,0 +1 @@
+{"id":"549eeb1a-d762-420c-bb44-3fd6d73a5268"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/0
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/0
new file mode 100644
index 0000000..43db49d
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/0
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1531172902041,"conf":{"spark.sql.shuffle.partitions":"10","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+0
+0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/1
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/1
new file mode 100644
index 0000000..8cc898e
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/offsets/1
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":10000,"batchTimestampMs":1531172902217,"conf":{"spark.sql.shuffle.partitions":"10","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+1
+0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/6078b891/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 7e8fde1..58ed979 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -18,18 +18,22 @@
 package org.apache.spark.sql.streaming
 
 import java.{util => ju}
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util.{Calendar, Date}
 
+import org.apache.commons.io.FileUtils
 import org.scalatest.{BeforeAndAfter, Matchers}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Dataset}
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode._
+import org.apache.spark.util.Utils
 
 class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with 
Matchers with Logging {
 
@@ -484,6 +488,136 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     testWithFlag(false)
   }
 
+  test("MultipleWatermarkPolicy: max") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    withSQLConf(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max") {
+      testStream(dfWithMultipleWatermarks(input1, input2))(
+        MultiAddData(input1, 20)(input2, 30),
+        CheckLastBatch(20, 30),
+        checkWatermark(input1, 15), // max(20 - 10, 30 - 15) = 15
+        StopStream,
+        StartStream(),
+        checkWatermark(input1, 15), // watermark recovered correctly
+        MultiAddData(input1, 120)(input2, 130),
+        CheckLastBatch(120, 130),
+        checkWatermark(input1, 115), // max(120 - 10, 130 - 15) = 115, policy 
recovered correctly
+        AddData(input1, 150),
+        CheckLastBatch(150),
+        checkWatermark(input1, 140)  // should advance even if one of the 
input has data
+      )
+    }
+  }
+
+  test("MultipleWatermarkPolicy: min") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    withSQLConf(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "min") {
+      testStream(dfWithMultipleWatermarks(input1, input2))(
+        MultiAddData(input1, 20)(input2, 30),
+        CheckLastBatch(20, 30),
+        checkWatermark(input1, 10), // min(20 - 10, 30 - 15) = 10
+        StopStream,
+        StartStream(),
+        checkWatermark(input1, 10), // watermark recovered correctly
+        MultiAddData(input1, 120)(input2, 130),
+        CheckLastBatch(120, 130),
+        checkWatermark(input2, 110), // min(120 - 10, 130 - 15) = 110, policy 
recovered correctly
+        AddData(input2, 150),
+        CheckLastBatch(150),
+        checkWatermark(input2, 110)  // does not advance when only one of the 
input has data
+      )
+    }
+  }
+
+  test("MultipleWatermarkPolicy: recovery from checkpoints ignores session 
conf") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    withSQLConf(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max") {
+      testStream(dfWithMultipleWatermarks(input1, input2))(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        MultiAddData(input1, 20)(input2, 30),
+        CheckLastBatch(20, 30),
+        checkWatermark(input1, 15) // max(20 - 10, 30 - 15) = 15
+      )
+    }
+
+    withSQLConf(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "min") {
+      testStream(dfWithMultipleWatermarks(input1, input2))(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        checkWatermark(input1, 15), // watermark recovered correctly
+        MultiAddData(input1, 120)(input2, 130),
+        CheckLastBatch(120, 130),
+        checkWatermark(input1, 115), // max(120 - 10, 130 - 15) = 115, policy 
recovered correctly
+        AddData(input1, 150),
+        CheckLastBatch(150),
+        checkWatermark(input1, 140) // should advance even if one of the input 
has data
+      )
+    }
+  }
+
+  test("MultipleWatermarkPolicy: recovery from Spark ver 2.3.1 checkpoints 
ensures min policy") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    val resourceUri = this.getClass.getResource(
+      
"/structured-streaming/checkpoint-version-2.3.1-for-multi-watermark-policy/").toURI
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    input1.addData(20)
+    input2.addData(30)
+    input1.addData(10)
+
+    withSQLConf(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max") {
+      testStream(dfWithMultipleWatermarks(input1, input2))(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        Execute { _.processAllAvailable() },
+        MultiAddData(input1, 120)(input2, 130),
+        CheckLastBatch(120, 130),
+        checkWatermark(input2, 110), // should calculate 'min' even if session 
conf has 'max' policy
+        AddData(input2, 150),
+        CheckLastBatch(150),
+        checkWatermark(input2, 110)
+      )
+    }
+  }
+
+  test("MultipleWatermarkPolicy: fail on incorrect conf values") {
+    val invalidValues = Seq("", "random")
+    invalidValues.foreach { value =>
+      val e = intercept[IllegalArgumentException] {
+        spark.conf.set(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key, value)
+      }
+      assert(e.getMessage.toLowerCase.contains("valid values are 'min' and 
'max'"))
+    }
+  }
+
+  private def dfWithMultipleWatermarks(
+      input1: MemoryStream[Int],
+      input2: MemoryStream[Int]): Dataset[_] = {
+    val df1 = input1.toDF
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+    val df2 = input2.toDF
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "15 seconds")
+    df1.union(df2).select($"eventTime".cast("int"))
+  }
+
+  private def checkWatermark(input: MemoryStream[Int], watermark: Long) = 
Execute { q =>
+    input.addData(1)
+    q.processAllAvailable()
+    assert(q.lastProgress.eventTime.get("watermark") == 
formatTimestamp(watermark))
+  }
+
   private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = 
AssertOnQuery { q =>
     q.processAllAvailable()
     val progressWithData = q.recentProgress.lastOption.get


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to