HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1115352110


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala:
##########
@@ -548,17 +549,131 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     assert(e.getMessage contains "should not be negative.")
   }
 
-  test("the new watermark should override the old one") {
-    val df = MemoryStream[(Long, Long)].toDF()
+  private def buildTestQueryForOverridingWatermark(): (MemoryStream[(Long, 
Long)], DataFrame) = {
+    val input = MemoryStream[(Long, Long)]
+    val df = input.toDF()
       .withColumn("first", timestamp_seconds($"_1"))
       .withColumn("second", timestamp_seconds($"_2"))
       .withWatermark("first", "1 minute")
+      .select("*")
       .withWatermark("second", "2 minutes")
+      .groupBy(window($"second", "1 minute"))
+      .count()
 
-    val eventTimeColumns = df.logicalPlan.output
-      .filter(_.metadata.contains(EventTimeWatermark.delayKey))
-    assert(eventTimeColumns.size === 1)
-    assert(eventTimeColumns(0).name === "second")
+    (input, df)
+  }
+
+  test("overriding watermark should not be allowed by default") {
+    val (input, df) = buildTestQueryForOverridingWatermark()
+    testStream(df)(
+      AddData(input, (100L, 200L)),
+      ExpectFailure[AnalysisException](assertFailure = exc => {
+        assert(exc.getMessage.contains("Redefining watermark is disallowed."))
+        
assert(exc.getMessage.contains(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key))
+      })
+    )
+  }
+
+  test("overriding watermark should not fail in compatibility mode") {
+    withSQLConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key -> "false") {
+      val (input, df) = buildTestQueryForOverridingWatermark()
+      testStream(df)(
+        AddData(input, (100L, 200L)),
+        CheckAnswer(),
+        Execute { query =>
+          val lastExecution = query.lastExecution
+          val aggSaveOperator = lastExecution.executedPlan.collect {
+            case j: StateStoreSaveExec => j
+          }.head
+
+          // - watermark from first definition = 100 - 60 = 40
+          // - watermark from second definition = 200 - 120 = 80
+          // - global watermark = min(40, 60) = 40
+          //
+          // As we see the result, even though we override the watermark 
definition, the old
+          // definition of watermark still plays to calculate global watermark.
+          //
+          // This is conceptually the right behavior. For operators after the 
first watermark
+          // definition, the column named "first" is considered as event time 
column, and for
+          // operators after the second watermark definition, the column named 
"second" is
+          // considered as event time column. The correct "single" value of 
watermark satisfying
+          // all operators should be lower bound of both columns "first" and 
"second".
+          //
+          // That said, this easily leads to incorrect definition - e.g. 
re-define watermark
+          // against the output of streaming aggregation for append mode. The 
global watermark
+          // cannot advance. This is the reason we don't allow re-define 
watermark in new behavior.
+          val expectedWatermarkMs = 40 * 1000
+
+          assert(aggSaveOperator.eventTimeWatermarkForLateEvents === 
Some(expectedWatermarkMs))
+          assert(aggSaveOperator.eventTimeWatermarkForEviction === 
Some(expectedWatermarkMs))
+
+          val eventTimeCols = aggSaveOperator.keyExpressions.filter(
+            _.metadata.contains(EventTimeWatermark.delayKey))
+          assert(eventTimeCols.size === 1)
+          assert(eventTimeCols.head.name === "window")
+          // 2 minutes delay threshold
+          
assert(eventTimeCols.head.metadata.getLong(EventTimeWatermark.delayKey) === 120 
* 1000)
+        }
+      )
+    }
+  }
+
+  private def buildTestQueryForMultiEventTimeColumns()
+    : (MemoryStream[(String, Long)], MemoryStream[(String, Long)], DataFrame) 
= {
+    val input1 = MemoryStream[(String, Long)]
+    val input2 = MemoryStream[(String, Long)]
+    val df1 = input1.toDF()
+      .selectExpr("_1 AS id1", "timestamp_seconds(_2) AS ts1")
+      .withWatermark("ts1", "1 minute")
+
+    val df2 = input2.toDF()
+      .selectExpr("_1 AS id2", "timestamp_seconds(_2) AS ts2")
+      .withWatermark("ts2", "2 minutes")
+
+    val joined = df1.join(df2, expr("id1 = id2 AND ts1 = ts2 + INTERVAL 10 
SECONDS"), "inner")
+      .selectExpr("id1", "ts1", "ts2")
+    // the output of join contains both ts1 and ts2
+    val dedup = joined.dropDuplicates()
+      .selectExpr("id1", "CAST(ts1 AS LONG) AS ts1", "CAST(ts2 AS LONG) AS 
ts2")
+
+    (input1, input2, dedup)
+  }
+
+  test("multiple event time columns in an input DataFrame for stateful 
operator is " +

Review Comment:
   Please note that the same query runs with compatibility mode as below test 
case, but there has been no guarantee of correctness. I'd prefer blocking the 
query for the possible case of correctness issue (hence we do) but I think we 
may want to also have an escape hatch as well (hence compatibility mode is 
there).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to