MaxGekk commented on a change in pull request #28119: [WIP][SPARK-31359][SQL] 
Speed up timestamps rebasing
URL: https://github.com/apache/spark/pull/28119#discussion_r405708496
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
 ##########
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.time.{LocalDateTime, ZoneId}
+import java.time.temporal.ChronoField
+import java.util.{Calendar, TimeZone}
+
+import scala.collection.mutable.AnyRefMap
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, 
ScalaObjectMapper}
+
+import org.apache.spark.sql.catalyst.util.DateTimeConstants._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+
+/**
+ * The collection of functions for rebasing days and microseconds from/to 
Proleptic Gregorian
+ * calendar used by default in Spark SQL since version 3.0, see SPARK-26651 
to/from
+ * the hybrid calendar (Julian + Gregorian since 1582-10-15) which is used by 
Spark 2.4
+ * and earlier versions.
+ */
+object RebaseDateTime {
+  /**
+   * Rebases days since the epoch from an original to an target calendar, for 
instance,
+   * from a hybrid (Julian + Gregorian) to Proleptic Gregorian calendar.
+   *
+   * It finds the latest switch day which is less than `value`, and adds the 
difference
+   * in days associated with the switch days to the given `value`.
+   * The function is based on linear search which starts from the most recent 
switch days.
+   * This allows to perform less comparisons for modern dates.
+   *
+   * @param switches The days when difference in days between original and 
target
+   *                   calendar was changed.
+   * @param diffs The differences in days between calendars.
+   * @param value The number of days since the epoch 1970-01-01 to be rebased 
to the
+   *             target calendar.
+   * @return The rebased days.
+   */
+  private def rebaseDays(switches: Array[Int], diffs: Array[Int], value: Int): 
Int = {
+    var i = switches.length
+    do { i -= 1 } while (i > 0 && value < switches(i))
+    value + diffs(i)
+  }
+
+  // The differences in days between Julian and Proleptic Gregorian dates.
+  // The diff at the index `i` is applicable for all days in the date interval:
+  // [julianGregDiffSwitchDay(i), julianGregDiffSwitchDay(i+1))
+  private val julianGregDiffs = Array(2, 1, 0, -1, -2, -3, -4, -5, -6, -7, -8, 
-9, -10, 0)
+  // The sorted days in Julian calendar when difference in days between Julian 
and
+  // Proleptic Gregorian calendars was changed.
+  // The starting point is the `0001-01-01` (-719164 days since the epoch in
+  // Julian calendar). All dates before the staring point have the same 
difference
+  // of 2 days in Julian and Proleptic Gregorian calendars.
+  private val julianGregDiffSwitchDay = Array(
+    -719164, -682945, -646420, -609895, -536845, -500320, -463795,
+    -390745, -354220, -317695, -244645, -208120, -171595, -141427)
+
+  /**
+   * Converts the given number of days since the epoch day 1970-01-01 to
+   * a local date in Julian calendar, interprets the result as a local
+   * date in Proleptic Gregorian calendar, and take the number of days
+   * since the epoch from the Gregorian date.
+   *
+   * @param days The number of days since the epoch in Julian calendar.
+   * @return The rebased number of days in Gregorian calendar.
+   */
+  def rebaseJulianToGregorianDays(days: Int): Int = {
+    rebaseDays(julianGregDiffSwitchDay, julianGregDiffs, days)
+  }
+
+  // The differences in days between Proleptic Gregorian and Julian dates.
+  // The diff at the index `i` is applicable for all days in the date interval:
+  // [gregJulianDiffSwitchDay(i), gregJulianDiffSwitchDay(i+1))
+  private val gregJulianDiffs = Array(-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 
10, 0)
+  // The sorted days in Proleptic Gregorian calendar when difference in days 
between
+  // Proleptic Gregorian and Julian was changed.
+  // The starting point is the `0001-01-01` (-719162 days since the epoch in
+  // Proleptic Gregorian calendar). All dates before the staring point have 
the same
+  // difference of -2 days in Proleptic Gregorian and Julian calendars.
+  private val gregJulianDiffSwitchDay = Array(
+    -719162, -682944, -646420, -609896, -536847, -500323, -463799,
+    -390750, -354226, -317702, -244653, -208129, -171605, -141427)
+
+  /**
+   * Rebasing days since the epoch to store the same number of days
+   * as by Spark 2.4 and earlier versions. Spark 3.0 switched to
+   * Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of 
that,
+   * this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
+   * Julian calendar for dates before 1582-10-15. So, the same local date may
+   * be mapped to different number of days since the epoch in different 
calendars.
+   *
+   * For example:
+   *   Proleptic Gregorian calendar: 1582-01-01 -> -141714
+   *   Julian calendar: 1582-01-01 -> -141704
+   * The code below converts -141714 to -141704.
+   *
+   * @param days The number of days since the epoch 1970-01-01. It can be 
negative.
+   * @return The rebased number of days since the epoch in Julian calendar.
+   */
+  def rebaseGregorianToJulianDays(days: Int): Int = {
+    rebaseDays(gregJulianDiffSwitchDay, gregJulianDiffs, days)
+  }
+
+  /**
+   * Converts the given microseconds to a local date-time in UTC time zone in 
Proleptic Gregorian
+   * calendar, interprets the result as a local date-time in Julian calendar 
in UTC time zone.
+   * And takes microseconds since the epoch from the Julian timestamp.
+   *
+   * @param zoneId The time zone ID at which the rebasing should be performed.
+   * @param micros The number of microseconds since the epoch 
'1970-01-01T00:00:00Z'.
+   * @return The rebased microseconds since the epoch in Julian calendar.
+   */
+  private[sql] def rebaseGregorianToJulianMicros(zoneId: ZoneId, micros: 
Long): Long = {
+    val instant = microsToInstant(micros)
+    val ldt = instant.atZone(zoneId).toLocalDateTime
+    val cal = new Calendar.Builder()
+      // `gregory` is a hybrid calendar that supports both
+      // the Julian and Gregorian calendar systems
+      .setCalendarType("gregory")
+      .setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
+      .setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
+      // Local time-line can overlaps, such as at an autumn daylight savings 
cutover.
+      // This setting selects the original local timestamp mapped to the given 
`micros`.
+      .set(Calendar.DST_OFFSET, 
zoneId.getRules.getDaylightSavings(instant).toMillis.toInt)
+      .build()
+    millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
+  }
+
+
+  /**
+   * The class describes JSON records with microseconds rebasing info.
+   * Here is an example of JSON file:
+   * {{{
+   *   [
+   *     {
+   *       "tz": "Europe/Paris",
+   *       "switches": [-123, 0],
+   *       "diffs": [422000000, 0]
+   *     }
+   *   ]
+   * }}}
+   *
+   * @param tz One of time zone ID which is expected to be acceptable by 
`ZoneId.of`.
+   * @param switches An ordered array of seconds since the epoch when the diff 
between
+   *                 two calendars are changed.
+   * @param diffs Differences in seconds associated with elements of 
`switches`.
+   */
+  private case class JsonRebaseRecord(tz: String, switches: Array[Long], 
diffs: Array[Long])
+
+  /**
+   * Rebasing info used to convert microseconds from an original to a target 
calendar.
+   *
+   * @param switches An ordered array of microseconds since the epoch when the 
diff between
+   *                 two calendars are changed.
+   * @param diffs Differences in microseconds associated with elements of 
`switches`.
+   */
+  private[sql] case class RebaseInfo(switches: Array[Long], diffs: Array[Long])
+
+  /**
+   * Rebases micros since the epoch from an original to an target calendar, 
for instance,
+   * from a hybrid (Julian + Gregorian) to Proleptic Gregorian calendar.
+   *
+   * It finds the latest switch micros which is less than `value`, and adds 
the difference
+   * in micros associated with the switch micros to the given `value`.
+   * The function is based on linear search which starts from the most recent 
switch micros.
+   * This allows to perform less comparisons for modern timestamps.
+   *
+   * @param rebaseInfo The rebasing info contains an ordered micros when 
difference in micros
+   *                   between original and target calendar was changed,
+   *                   and differences in micros between calendars
+   * @param value The number of micros since the epoch 1970-01-01 to be 
rebased to the
+   *              target calendar.
+   * @return The rebased micros.
+   */
+  private def rebaseMicros(rebaseInfo: RebaseInfo, value: Long): Long = {
+    val switches = rebaseInfo.switches
+    var i = switches.length
+    do { i -= 1 } while (i > 0 && value < switches(i))
+    value + rebaseInfo.diffs(i)
+  }
+
+  // Loads rebasing info from an JSON file. JSON records in the files should 
conform to
+  // `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable 
map because
+  // it is 2 times faster in DateTimeRebaseBenchmark.
+  private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, 
RebaseInfo] = {
+    val file = 
Thread.currentThread().getContextClassLoader.getResource(fileName)
+    val mapper = new ObjectMapper() with ScalaObjectMapper
+    mapper.registerModule(DefaultScalaModule)
+    val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file)
+    val anyRefMap = new AnyRefMap[String, RebaseInfo]((3 * 
jsonRebaseRecords.size) / 2)
+    jsonRebaseRecords.foreach { jsonRecord =>
+      val rebaseInfo = RebaseInfo(jsonRecord.switches, jsonRecord.diffs)
+      var i = 0
+      while (i < rebaseInfo.switches.length) {
+        rebaseInfo.switches(i) = rebaseInfo.switches(i) * MICROS_PER_SECOND
+        rebaseInfo.diffs(i) = rebaseInfo.diffs(i) * MICROS_PER_SECOND
+        i += 1
+      }
+      anyRefMap.update(jsonRecord.tz, rebaseInfo)
+    }
+    anyRefMap
+  }
+
+  /**
+   * A map of time zone IDs to its ordered time points (instants in 
microseconds since the epoch)
+   * when the difference between 2 instances associated with the same local 
timestamp in
+   * Proleptic Gregorian and the hybrid calendar was changed, and to the diff 
at the index `i` is
+   * applicable for all microseconds in the time interval:
+   *   [gregJulianDiffSwitchMicros(i), gregJulianDiffSwitchMicros(i+1))
+   */
+  private val gregJulianRebaseMap = 
loadRebaseRecords("gregorian-julian-rebase-micros.json")
+
+  /**
+   * Rebases the given `micros` to the number of microseconds since the epoch 
via a local
+   * timestamp that have the same date-time fields in Proleptic Gregorian and 
in the hybrid
+   * calendars.
+   *
+   * The function may optimize the rebasing by using pre-calculated rebasing 
maps. If the maps
+   * don't contains information about the current JVM system time zone, the 
functions falls back
+   * to regular conversion mechanism via building local timestamps.
+   *
+   * Note: The function assumes that the input micros was derived from a local 
timestamp
+   *       at the default system JVM time zone in Proleptic Gregorian calendar.
+   *
+   * @param micros The microseconds since the epoch 
1970-01-01T00:00:00.000000Z.
+   * @return The microseconds since the epoch that have the same local 
timestamp representation
+   *         in the hybrid calendar (Julian + Gregorian) as the original 
`micros` in
+   *         Proleptic Gregorian calendar.
+   */
+  def rebaseGregorianToJulianMicros(micros: Long): Long = {
+    val timeZone = TimeZone.getDefault
+    val tzId = timeZone.getID
+    val rebaseRecord = gregJulianRebaseMap.getOrNull(tzId)
+    if (rebaseRecord == null) {
+      rebaseGregorianToJulianMicros(timeZone.toZoneId, micros)
+    } else {
+      rebaseMicros(rebaseRecord, micros)
+    }
+  }
+
+  /**
+   * Converts the given microseconds to a local date-time in UTC time zone in 
Julian calendar,
+   * interprets the result as a local date-time in Proleptic Gregorian 
calendar in UTC time zone.
+   * And takes microseconds since the epoch from the Gregorian timestamp.
+   *
+   * @param zoneId The time zone ID at which the rebasing should be performed.
+   * @param micros The number of microseconds since the epoch 
'1970-01-01T00:00:00Z'.
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to