MaxGekk commented on a change in pull request #33456:
URL: https://github.com/apache/spark/pull/33456#discussion_r674327457



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
##########
@@ -765,6 +765,55 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     }
   }
 
+  test("SPARK-35815: Support ANSI intervals for delay threshold") {
+    Seq(
+      // Conventional form and some variants
+      (Seq("3 days", "Interval 3 day", "inTerval '3' day"), 259200000),
+      (Seq("5 hours", "INTERVAL 5 hour", "interval '5' hour"), 18000000),
+      (Seq("8 minutes", "interval 8 minute", "interval '8' minute"), 480000),
+      (Seq("10 seconds", "interval 10 second", "interval '10' second"), 10000),

Review comment:
       Could you replace the magic numbers via constants like `10 * 
MILLIS_PER_SECOND` Now it is hard to check correctness of the expected values. 
Especially, the values below.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -739,13 +740,23 @@ class Dataset[T] private[sql](
   // We only accept an existing column name, not a derived column here as a 
watermark that is
   // defined on a derived column cannot referenced elsewhere in the plan.
   def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = 
withTypedPlan {
-    val parsedDelay =
-      try {
+    val parsedDelay = try {
+      if (delayThreshold.toLowerCase(Locale.ROOT).startsWith("interval")) {
+        CatalystSqlParser.parseExpression(delayThreshold) match {
+          case Literal(months: Int, _: YearMonthIntervalType) =>
+            new CalendarInterval(months, 0, 0)
+          case Literal(micros: Long, _: DayTimeIntervalType) =>
+            val days = micros / DateTimeConstants.MICROS_PER_DAY
+            val restMicros = micros % DateTimeConstants.MICROS_PER_DAY

Review comment:
       What's the reason to perform such normalization? Why cannot you just 
pass `micros` as the third parameter?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
##########
@@ -765,6 +765,55 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     }
   }
 
+  test("SPARK-35815: Support ANSI intervals for delay threshold") {
+    Seq(
+      // Conventional form and some variants
+      (Seq("3 days", "Interval 3 day", "inTerval '3' day"), 259200000),
+      (Seq("5 hours", "INTERVAL 5 hour", "interval '5' hour"), 18000000),
+      (Seq("8 minutes", "interval 8 minute", "interval '8' minute"), 480000),
+      (Seq("10 seconds", "interval 10 second", "interval '10' second"), 10000),
+      (Seq("1 years", "interval 1 year", "interval '1' year"), 32140800000L),
+      (Seq("1 months", "interval 1 month", "interval '1' month"), 2678400000L),
+      (Seq(
+        "1 day 2 hours 3 minutes 4 seconds",
+        "interval 1 day 2 hours 3 minutes 4 seconds",
+        "interval '1' day '2' hours '3' minutes '4' seconds",
+        "interval '1 2:3:4' day to second"), 93784000),
+      (Seq(
+        "1 year 2 months",
+        "interval 1 year 2 month",
+        "interval '1' year '2' month",
+        "interval '1-2' year to month"), 37497600000L)
+    ).foreach { case (delayThresholdVariants, expectedMs) =>
+      delayThresholdVariants.foreach { case delayThreshold =>
+        val df = MemoryStream[Int].toDF
+          .withColumn("eventTime", timestamp_seconds($"value"))
+          .withWatermark("eventTime", delayThreshold)
+        val eventTimeAttr = df.queryExecution.analyzed.output.find(a => a.name 
== "eventTime")
+        assert(eventTimeAttr.isDefined)
+        val metadata = eventTimeAttr.get.metadata
+        assert(metadata.contains(EventTimeWatermark.delayKey))
+        assert(metadata.getLong(EventTimeWatermark.delayKey) === expectedMs)
+      }
+    }
+
+    // Invalid interval patterns

Review comment:
       Should we allow an interval which begins from gaps like `  interval `?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to