MaxGekk commented on code in PR #56848:
URL: https://github.com/apache/spark/pull/56848#discussion_r3505187374
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala:
##########
@@ -1287,3 +1298,60 @@ object DateTimeUtils extends SparkDateTimeUtils {
c
}
}
+
+/**
+ * Per-task memoization of a zone's UTC offset, used by the
[[DateTimeUtils.truncTimestamp]] hot
+ * path. The session zone is constant for a query and the offset is
piecewise-constant between DST
+ * transitions, so consecutive rows almost always resolve to the same offset.
The cache holds the
+ * half-open epoch-second interval `[lo, hi)` on which the offset is provably
constant -- derived
+ * from the surrounding zone transitions -- so a lookup that falls in the
interval reduces to two
+ * comparisons instead of a transition-array binary search.
+ *
+ * Not thread-safe by design: a fresh instance is created per task (codegen
mutable state) and used
+ * single-threaded, mirroring how stateful per-row helpers are scoped in
generated code.
+ */
+class ZoneOffsetCache(val zoneId: ZoneId) {
+ private val rules = zoneId.getRules
+ val isFixedOffset: Boolean = rules.isFixedOffset
+
+ private var cached = false
+ private var lo = 0L
+ private var hi = 0L
+ private var offsetSec = 0L
+
+ /**
+ * Offset in seconds at `epochSec`, equal to
+ * `rules.getOffset(Instant.ofEpochSecond(epochSec)).getTotalSeconds`,
memoized over the
+ * constant-offset interval containing the previous lookup.
+ */
+ def offsetSeconds(epochSec: Long): Long = {
+ if (cached && epochSec >= lo && epochSec < hi) {
+ offsetSec
+ } else {
+ val instant = Instant.ofEpochSecond(epochSec)
+ val o = rules.getOffset(instant).getTotalSeconds.toLong
+ if (isFixedOffset) {
+ lo = Long.MinValue
+ hi = Long.MaxValue
+ } else {
+ val nextT = rules.nextTransition(instant)
Review Comment:
Question (non-blocking): on a cache *miss* this path now does three JDK rule
searches — `getOffset` + `nextTransition` + `previousTransition` — versus the
single `getOffset` before this PR. For temporally clustered data (the
benchmarked case) that's a clear win, but a workload that straddles a
transition on nearly every row, or randomly-distributed timestamps across a
zone with many historical transitions, misses every row and would be a net
regression vs the old path.
Did the benchmarks include a miss-heavy / random-timestamp case, or is that
worst case considered acceptable?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala:
##########
@@ -1287,3 +1298,60 @@ object DateTimeUtils extends SparkDateTimeUtils {
c
}
}
+
+/**
+ * Per-task memoization of a zone's UTC offset, used by the
[[DateTimeUtils.truncTimestamp]] hot
+ * path. The session zone is constant for a query and the offset is
piecewise-constant between DST
+ * transitions, so consecutive rows almost always resolve to the same offset.
The cache holds the
+ * half-open epoch-second interval `[lo, hi)` on which the offset is provably
constant -- derived
+ * from the surrounding zone transitions -- so a lookup that falls in the
interval reduces to two
+ * comparisons instead of a transition-array binary search.
+ *
+ * Not thread-safe by design: a fresh instance is created per task (codegen
mutable state) and used
+ * single-threaded, mirroring how stateful per-row helpers are scoped in
generated code.
+ */
+class ZoneOffsetCache(val zoneId: ZoneId) {
+ private val rules = zoneId.getRules
+ val isFixedOffset: Boolean = rules.isFixedOffset
+
+ private var cached = false
+ private var lo = 0L
+ private var hi = 0L
+ private var offsetSec = 0L
+
+ /**
+ * Offset in seconds at `epochSec`, equal to
+ * `rules.getOffset(Instant.ofEpochSecond(epochSec)).getTotalSeconds`,
memoized over the
+ * constant-offset interval containing the previous lookup.
+ */
+ def offsetSeconds(epochSec: Long): Long = {
+ if (cached && epochSec >= lo && epochSec < hi) {
+ offsetSec
+ } else {
+ val instant = Instant.ofEpochSecond(epochSec)
+ val o = rules.getOffset(instant).getTotalSeconds.toLong
+ if (isFixedOffset) {
+ lo = Long.MinValue
+ hi = Long.MaxValue
+ } else {
+ val nextT = rules.nextTransition(instant)
+ if (nextT == null) {
+ // No transition after `instant`: offset is constant on [epochSec,
+inf).
+ lo = epochSec
+ hi = Long.MaxValue
+ } else {
+ hi = nextT.toEpochSecond
+ // `hi - 1` lies strictly inside the constant-offset window ending
at `hi` (zone
+ // transitions are always more than a second apart), so its previous
transition is
+ // exactly that window's start. Anchoring on an interior point
avoids an off-by-one
+ // when `epochSec` sits exactly on a transition instant.
+ val prevT = rules.previousTransition(Instant.ofEpochSecond(hi - 1))
Review Comment:
Question (non-blocking): this interior-anchor makes `[lo, hi)` correct only
if `hi - 1` is guaranteed inside the constant-offset window — i.e. no two zone
transitions are ever <= 1s apart. Is that property guaranteed across the whole
tzdb, including historical / LMT transitions?
What makes me want it confirmed rather than assumed: the fast path's
`candidateOffsetSec != originalOffsetSec` fallback only guards a truncation
that *spans* a transition. It does **not** backstop a wrong `originalOffsetSec`
— if the window ever contained a transition, both the original and candidate
lookups would read the same wrong cached offset, the guard would compare equal,
and the wrong result would ship silently. So this invariant is load-bearing on
its own. If the >= 1s gap can't be guaranteed, could we degrade defensively
(e.g. narrow the window / fall back) when it's violated?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]