HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1115352110
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala:
##########
@@ -548,17 +549,131 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
assert(e.getMessage contains "should not be negative.")
}
- test("the new watermark should override the old one") {
- val df = MemoryStream[(Long, Long)].toDF()
+ private def buildTestQueryForOverridingWatermark(): (MemoryStream[(Long,
Long)], DataFrame) = {
+ val input = MemoryStream[(Long, Long)]
+ val df = input.toDF()
.withColumn("first", timestamp_seconds($"_1"))
.withColumn("second", timestamp_seconds($"_2"))
.withWatermark("first", "1 minute")
+ .select("*")
.withWatermark("second", "2 minutes")
+ .groupBy(window($"second", "1 minute"))
+ .count()
- val eventTimeColumns = df.logicalPlan.output
- .filter(_.metadata.contains(EventTimeWatermark.delayKey))
- assert(eventTimeColumns.size === 1)
- assert(eventTimeColumns(0).name === "second")
+ (input, df)
+ }
+
+ test("overriding watermark should not be allowed by default") {
+ val (input, df) = buildTestQueryForOverridingWatermark()
+ testStream(df)(
+ AddData(input, (100L, 200L)),
+ ExpectFailure[AnalysisException](assertFailure = exc => {
+ assert(exc.getMessage.contains("Redefining watermark is disallowed."))
+
assert(exc.getMessage.contains(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key))
+ })
+ )
+ }
+
+ test("overriding watermark should not fail in compatibility mode") {
+ withSQLConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key -> "false") {
+ val (input, df) = buildTestQueryForOverridingWatermark()
+ testStream(df)(
+ AddData(input, (100L, 200L)),
+ CheckAnswer(),
+ Execute { query =>
+ val lastExecution = query.lastExecution
+ val aggSaveOperator = lastExecution.executedPlan.collect {
+ case j: StateStoreSaveExec => j
+ }.head
+
+ // - watermark from first definition = 100 - 60 = 40
+ // - watermark from second definition = 200 - 120 = 80
+ // - global watermark = min(40, 60) = 40
+ //
+ // As we see the result, even though we override the watermark
definition, the old
+ // definition of watermark still plays to calculate global watermark.
+ //
+ // This is conceptually the right behavior. For operators after the
first watermark
+ // definition, the column named "first" is considered as event time
column, and for
+ // operators after the second watermark definition, the column named
"second" is
+ // considered as event time column. The correct "single" value of
watermark satisfying
+ // all operators should be lower bound of both columns "first" and
"second".
+ //
+ // That said, this easily leads to incorrect definition - e.g.
re-define watermark
+ // against the output of streaming aggregation for append mode. The
global watermark
+ // cannot advance. This is the reason we don't allow re-define
watermark in new behavior.
+ val expectedWatermarkMs = 40 * 1000
+
+ assert(aggSaveOperator.eventTimeWatermarkForLateEvents ===
Some(expectedWatermarkMs))
+ assert(aggSaveOperator.eventTimeWatermarkForEviction ===
Some(expectedWatermarkMs))
+
+ val eventTimeCols = aggSaveOperator.keyExpressions.filter(
+ _.metadata.contains(EventTimeWatermark.delayKey))
+ assert(eventTimeCols.size === 1)
+ assert(eventTimeCols.head.name === "window")
+ // 2 minutes delay threshold
+
assert(eventTimeCols.head.metadata.getLong(EventTimeWatermark.delayKey) === 120
* 1000)
+ }
+ )
+ }
+ }
+
+ private def buildTestQueryForMultiEventTimeColumns()
+ : (MemoryStream[(String, Long)], MemoryStream[(String, Long)], DataFrame)
= {
+ val input1 = MemoryStream[(String, Long)]
+ val input2 = MemoryStream[(String, Long)]
+ val df1 = input1.toDF()
+ .selectExpr("_1 AS id1", "timestamp_seconds(_2) AS ts1")
+ .withWatermark("ts1", "1 minute")
+
+ val df2 = input2.toDF()
+ .selectExpr("_1 AS id2", "timestamp_seconds(_2) AS ts2")
+ .withWatermark("ts2", "2 minutes")
+
+ val joined = df1.join(df2, expr("id1 = id2 AND ts1 = ts2 + INTERVAL 10
SECONDS"), "inner")
+ .selectExpr("id1", "ts1", "ts2")
+ // the output of join contains both ts1 and ts2
+ val dedup = joined.dropDuplicates()
+ .selectExpr("id1", "CAST(ts1 AS LONG) AS ts1", "CAST(ts2 AS LONG) AS
ts2")
+
+ (input1, input2, dedup)
+ }
+
+ test("multiple event time columns in an input DataFrame for stateful
operator is " +
Review Comment:
Please note that the same query runs with compatibility mode as below test
case, but there has been no guarantee of correctness. I'd prefer blocking the
query for the possible case of correctness issue (hence we do) but I think we
may want to also have an escape hatch as well (hence compatibility mode is
there).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]