cloud-fan commented on a change in pull request #24319: 
[SPARK-25496][SQL][followup] avoid using to_utc_timestamp
URL: https://github.com/apache/spark/pull/24319#discussion_r273067529
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ##########
 @@ -342,55 +342,52 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
   }
 
   testWithAllStateVersions("prune results by current_date, complete mode") {
-    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
-      import testImplicits._
-      val clock = new StreamManualClock
-      val tz = TimeZone.getDefault.getID
-      val inputData = MemoryStream[Long]
-      val aggregated =
-        inputData.toDF()
-          .select(to_utc_timestamp(from_unixtime('value * 
DateTimeUtils.SECONDS_PER_DAY), tz))
-          .toDF("value")
-          .groupBy($"value")
-          .agg(count("*"))
-          .where($"value".cast("date") >= date_sub(current_date(), 10))
-          .select(
-            ($"value".cast("long") / 
DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
-      testStream(aggregated, Complete)(
-        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-        // advance clock to 10 days, should retain all keys
-        AddData(inputData, 0L, 5L, 5L, 10L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
-        // advance clock to 20 days, should retain keys >= 10
-        AddData(inputData, 15L, 15L, 20L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
-        // advance clock to 30 days, should retain keys >= 20
-        AddData(inputData, 85L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((20L, 1), (85L, 1)),
-
-        // bounce stream and ensure correct batch timestamp is used
-        // i.e., we don't take it from the clock, which is at 90 days.
-        StopStream,
-        AssertOnQuery { q => // clear the sink
-          q.sink.asInstanceOf[MemorySink].clear()
-          q.commitLog.purge(3)
-          // advance by 60 days i.e., 90 days total
-          clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
-          true
-        },
-        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-        // Commit log blown, causing a re-run of the last batch
-        CheckLastBatch((20L, 1), (85L, 1)),
-
-        // advance clock to 100 days, should retain keys >= 90
-        AddData(inputData, 85L, 90L, 100L, 105L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
-      )
-    }
+    import testImplicits._
+    val clock = new StreamManualClock
+    val tz = TimeZone.getDefault.getID
+    val inputData = MemoryStream[Long]
+    val aggregated =
+      inputData.toDF()
+        .select(($"value" * 
DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value"))
 
 Review comment:
   there is one more: I replaced `current_date` with 
`current_timestamp().cast("date")`.
   
   `current_date` returns date in UTC. We can't compare it with the value of 
casting timestamp column to date. There is a timezone shift when casting 
between date and timestamp.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to