cloud-fan commented on code in PR #56409: URL: https://github.com/apache/spark/pull/56409#discussion_r3400015481
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimestampNanosCast.scala: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.CAST +import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType} + +/** + * Rewrites casts between [[DateType]] and the nanosecond-precision timestamp types + * ([[TimestampLTZNanosType]] / [[TimestampNTZNanosType]]) into a two-step cast that goes through + * the corresponding microsecond-precision timestamp type: + * + * - `nanos(p) -> DATE` ==> `nanos(p) -> micros -> DATE` + * - `DATE -> nanos(p)` ==> `DATE -> micros -> nanos(p)` + * + * where the microsecond counterpart is `TIMESTAMP` for `*_LTZ` and `TIMESTAMP_NTZ` for `*_NTZ`. + * Both component casts already exist (`nanos(p) <-> micros` and `micros <-> DATE`), so no dedicated + * `DATE <-> nanos` conversion is needed in [[Cast]] and the semantics match the microsecond + * `DATE <-> TIMESTAMP` casts: the LTZ directions are resolved in the session time zone and the NTZ + * directions on the UTC wall-clock grid; sub-microsecond digits and the time-of-day are dropped + * when narrowing to `DATE`. + * + * The same rewrite applies when the `DATE <-> nanos` pair is nested inside complex types + * ([[ArrayType]] / [[MapType]] / [[StructType]]) at any depth, e.g. + * `ARRAY<nanos(p)> -> ARRAY<DATE>` becomes `ARRAY<nanos(p)> -> ARRAY<micros> -> ARRAY<DATE>`. The + * intermediate type mirrors the structure (and nullability) of the target type, with every + * `DATE <-> nanos` leaf swapped for the corresponding microsecond-precision timestamp type, so that + * both component casts pass `Cast.canCast`. + * + * `Cast.canCast` intentionally does not allow `DATE <-> nanos` directly (it recurses into complex + * types, so nested pairs are rejected too), so such a cast stays unresolved until this rule + * rewrites it into the resolvable nested form. The new casts inherit the original `timeZoneId` and + * `evalMode`; the only zone-sensitive part (the `micros <-> DATE` cast) gets its session time zone + * from [[ResolveTimeZone]] within the same fixed-point batch. + * + * The per-cast rewrite is exposed via [[rewriteDateNanosCast]] so that the single-pass resolver + * (see `TimezoneAwareExpressionResolver`) can apply the same transformation and produce an + * identical plan; otherwise single-pass resolution would fail the cast's input type check while + * fixed-point succeeds. + */ +object ResolveTimestampNanosCast extends Rule[LogicalPlan] { + + /** The microsecond-precision timestamp counterpart of a nanosecond-precision timestamp type. */ + private def microTimestampType(dt: DataType): Option[DataType] = dt match { + case _: TimestampLTZNanosType => Some(TimestampType) + case _: TimestampNTZNanosType => Some(TimestampNTZType) + case _ => None + } + + /** + * Computes the intermediate ("bridge") type to route a `from -> to` cast through when the + * conversion involves a `DATE <-> nanos(p)` pair at any nesting depth. The bridge mirrors the + * structure (and nullability) of `to`, with every `DATE <-> nanos` leaf replaced by the + * corresponding microsecond-precision timestamp type. Returns `None` when no `DATE <-> nanos` + * pair is present, in which case [[Cast]] already handles the conversion directly. + */ + private def bridgeType(from: DataType, to: DataType): Option[DataType] = (from, to) match { + // Scalar DATE <-> nanos(p): route through the microsecond counterpart of the nanos side. + case (DateType, _) => microTimestampType(to) + case (_, DateType) => microTimestampType(from) + + case (ArrayType(fromEl, _), ArrayType(toEl, toContainsNull)) => + bridgeType(fromEl, toEl).map(ArrayType(_, toContainsNull)) + + case (MapType(fromKey, fromVal, _), MapType(toKey, toVal, toValContainsNull)) => + val keyBridge = bridgeType(fromKey, toKey) + val valBridge = bridgeType(fromVal, toVal) + if (keyBridge.isEmpty && valBridge.isEmpty) { + None + } else { + Some(MapType(keyBridge.getOrElse(toKey), valBridge.getOrElse(toVal), toValContainsNull)) + } + + case (StructType(fromFields), StructType(toFields)) + if fromFields.length == toFields.length => + val fieldBridges = fromFields.zip(toFields).map { + case (fromField, toField) => bridgeType(fromField.dataType, toField.dataType) + } + if (fieldBridges.forall(_.isEmpty)) { + None + } else { + Some(StructType(toFields.zip(fieldBridges).map { + case (toField, bridge) => bridge.map(bt => toField.copy(dataType = bt)).getOrElse(toField) + })) + } + + case _ => None + } + + /** + * If `cast` converts (recursively, through complex types) between [[DateType]] and a + * nanosecond-precision timestamp type, returns the equivalent two-step cast routed through the + * corresponding microsecond-precision timestamp type; returns `None` for any other cast. The + * nested casts inherit the original `timeZoneId` and `evalMode`. + */ + def rewriteDateNanosCast(cast: Cast): Option[Cast] = cast match { + case Cast(child, to, tz, mode) if child.resolved => + bridgeType(child.dataType, to).map { micros => Review Comment: The rewrite fires even when one of the rewritten legs is itself invalid, so the DATATYPE_MISMATCH error ends up naming the internal bridge type the user never wrote. Two repro shapes: `CAST(struct<a: nanos, b: binary> AS struct<a: date, b: int>)` — `bridgeType` fires on field `a`, but the `b` leg is uncastable, so the error reports `struct<a: timestamp_ntz, b: int>`; and (DataFrame API) `ARRAY<DATE> -> ARRAY<nanos(9)>` with `containsNull = false` on both sides — `forceNullable(DATE, TIMESTAMP_NTZ)` is true, so the inner leg can't resolve. Suggest returning `None` unless both legs pass the evalMode-appropriate capability check (the same `canCast`/`canAnsiCast`/`canTryCast` dispatch as `Cast.checkInputDataTypes`), so an invalid cast keeps failing in terms of the user's own types. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolver.scala: ########## @@ -45,7 +46,9 @@ class TimezoneAwareExpressionResolver(expressionResolver: ExpressionResolver) /** * Resolves a [[TimeZoneAwareExpression]] by resolving its children, applying a timezone * and calling [[coerceExpressionTypes]] on the result. If the expression is a [[Cast]], we apply - * [[collapseCast]] to the result. + * [[collapseCast]] to the result, and rewrite a `DATE <-> nanos` cast through the microsecond Review Comment: The method is `tryCollapseCast` — `[[collapseCast]]` doesn't resolve (pre-existing wrong name, but this line is being rewritten anyway): ```suggestion * [[tryCollapseCast]] to the result, and rewrite a `DATE <-> nanos` cast through the microsecond ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimestampNanosCast.scala: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.CAST +import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType} + +/** + * Rewrites casts between [[DateType]] and the nanosecond-precision timestamp types + * ([[TimestampLTZNanosType]] / [[TimestampNTZNanosType]]) into a two-step cast that goes through + * the corresponding microsecond-precision timestamp type: + * + * - `nanos(p) -> DATE` ==> `nanos(p) -> micros -> DATE` + * - `DATE -> nanos(p)` ==> `DATE -> micros -> nanos(p)` + * + * where the microsecond counterpart is `TIMESTAMP` for `*_LTZ` and `TIMESTAMP_NTZ` for `*_NTZ`. + * Both component casts already exist (`nanos(p) <-> micros` and `micros <-> DATE`), so no dedicated + * `DATE <-> nanos` conversion is needed in [[Cast]] and the semantics match the microsecond + * `DATE <-> TIMESTAMP` casts: the LTZ directions are resolved in the session time zone and the NTZ + * directions on the UTC wall-clock grid; sub-microsecond digits and the time-of-day are dropped + * when narrowing to `DATE`. + * + * The same rewrite applies when the `DATE <-> nanos` pair is nested inside complex types + * ([[ArrayType]] / [[MapType]] / [[StructType]]) at any depth, e.g. + * `ARRAY<nanos(p)> -> ARRAY<DATE>` becomes `ARRAY<nanos(p)> -> ARRAY<micros> -> ARRAY<DATE>`. The + * intermediate type mirrors the structure (and nullability) of the target type, with every + * `DATE <-> nanos` leaf swapped for the corresponding microsecond-precision timestamp type, so that + * both component casts pass `Cast.canCast`. + * + * `Cast.canCast` intentionally does not allow `DATE <-> nanos` directly (it recurses into complex + * types, so nested pairs are rejected too), so such a cast stays unresolved until this rule + * rewrites it into the resolvable nested form. The new casts inherit the original `timeZoneId` and + * `evalMode`; the only zone-sensitive part (the `micros <-> DATE` cast) gets its session time zone + * from [[ResolveTimeZone]] within the same fixed-point batch. + * + * The per-cast rewrite is exposed via [[rewriteDateNanosCast]] so that the single-pass resolver + * (see `TimezoneAwareExpressionResolver`) can apply the same transformation and produce an + * identical plan; otherwise single-pass resolution would fail the cast's input type check while + * fixed-point succeeds. + */ +object ResolveTimestampNanosCast extends Rule[LogicalPlan] { Review Comment: On the open rule-vs-#56375 question (from the thread proposing reuse for `TIME` and `LTZ <-> NTZ`): I'd take the direct-Cast approach (#56375), for two reasons. 1. The rule's coordination costs showed up within this PR's own review: the single-pass mirror and the complex-type recursion in `bridgeType` each re-implement something `Cast` provides for free, and both were initially missed; the `USER_SPECIFIED_CAST` single-pass drop is still open. Every future bridged pair pays the same. There are also standing costs: `Cast.canCast` returns false for a cast SQL supports, EXPLAIN shows double casts, and a `Cast` constructed outside the analyzer never gets the rewrite. 2. The reuse argument caps out. The bridge needs an exact micros intermediate — fine for `DATE`, and for `TIME` (its max precision is 6, so sub-micro loss is target-mandated). But `TIMESTAMP_NTZ(p) <-> TIMESTAMP_LTZ(p)` can't route through micros without silently truncating sub-microsecond digits the target can represent — zone conversion shifts whole seconds, so a direct conversion preserves them. That pair will need direct `Cast` arms regardless, leaving the codebase with both mechanisms. The direct arms are one-line compositions of the same utilities the micros casts use (`daysToMicros`/`microsToDays` + `TimestampNanosVal.fromParts`), and this PR's test matrix pins micros-parity either way. Suggest reviving #56375 and folding in this PR's `canANSIStoreAssign`/`canUpCast` blocks and its test coverage (complex types, golden files, dual-run). ########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimestampNanosCastSuite.scala: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.time.{LocalDate, LocalDateTime} + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, EvalMode, Expression, ExpressionEvalHelper, Literal, ScalarSubquery} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, DateType, IntegerType, MapType, StringType, StructField, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType} +import org.apache.spark.unsafe.types.TimestampNanosVal + +/** + * Test suite for [[ResolveTimestampNanosCast]], which rewrites `DATE <-> nanos(p)` casts into a + * two-step cast through the microsecond timestamp type. + */ +class ResolveTimestampNanosCastSuite extends AnalysisTest with ExpressionEvalHelper { + + private val ntzNanos = TimestampNTZNanosType(TimestampNTZNanosType.MAX_PRECISION) + private val ltzNanos = TimestampLTZNanosType(TimestampLTZNanosType.MAX_PRECISION) + + private val ntzAttr = AttributeReference("ntz", ntzNanos)() + private val ltzAttr = AttributeReference("ltz", ltzNanos)() + private val dateAttr = AttributeReference("d", DateType)() + + // Complex-typed inputs that nest a nanos timestamp at various depths. + private val arrNtzAttr = AttributeReference("arr_ntz", ArrayType(ntzNanos, containsNull = true))() + private val arrDateAttr = AttributeReference("arr_d", ArrayType(DateType, containsNull = true))() + private val mapNtzAttr = + AttributeReference("map_ntz", MapType(StringType, ntzNanos, valueContainsNull = true))() + private val structNtzAttr = + AttributeReference("st_ntz", StructType(Seq(StructField("f", ntzNanos))))() + private val structMixedAttr = AttributeReference( + "st_mixed", + StructType(Seq(StructField("a", ntzNanos), StructField("b", IntegerType))))() + private val arrStructNtzAttr = AttributeReference( + "arr_st_ntz", + ArrayType(StructType(Seq(StructField("f", ntzNanos))), containsNull = true))() + + private val relation = LocalRelation( + ntzAttr, + ltzAttr, + dateAttr, + arrNtzAttr, + arrDateAttr, + mapNtzAttr, + structNtzAttr, + structMixedAttr, + arrStructNtzAttr) + + // Rewrite only: keeps the original time zone id so the structure can be compared exactly. + private object Rewrite extends RuleExecutor[LogicalPlan] { + val batches = Batch("rewrite", FixedPoint(10), ResolveTimestampNanosCast) :: Nil + } + + // Rewrite + time zone assignment, used to obtain a fully resolved, evaluable expression. + private object Analyze extends RuleExecutor[LogicalPlan] { + val batches = + Batch("analyze", FixedPoint(10), ResolveTimeZone, ResolveTimestampNanosCast) :: Nil + } + + private def micro(dt: DataType): DataType = dt match { + case _: TimestampLTZNanosType => TimestampType + case _: TimestampNTZNanosType => TimestampNTZType + } + + private def checkRewrite(in: Expression, out: Expression): Unit = { + comparePlans(Rewrite.execute(relation.select(in.as("c"))), relation.select(out.as("c"))) + } + + private def analyzeExpr(e: Expression): Expression = { + Analyze.execute(OneRowRelation().select(e.as("c"))) + .asInstanceOf[Project].projectList.head.asInstanceOf[Alias].child + } + + test("rewrite nanos(p) -> DATE through the microsecond timestamp type") { + Seq(ntzAttr, ltzAttr).foreach { attr => + checkRewrite( + Cast(attr, DateType), + Cast(Cast(attr, micro(attr.dataType)), DateType)) + } + } + + test("rewrite DATE -> nanos(p) through the microsecond timestamp type") { + Seq(ntzNanos, ltzNanos).foreach { nanos => + checkRewrite( + Cast(dateAttr, nanos), + Cast(Cast(dateAttr, micro(nanos)), nanos)) + } + } + + test("rewrite preserves timeZoneId and evalMode") { + val tz = Option(LA.getId) + Seq(EvalMode.LEGACY, EvalMode.ANSI, EvalMode.TRY).foreach { mode => + // nanos(p) -> DATE + checkRewrite( + Cast(ltzAttr, DateType, tz, mode), + Cast(Cast(ltzAttr, TimestampType, tz, mode), DateType, tz, mode)) + // DATE -> nanos(p) + checkRewrite( + Cast(dateAttr, ltzNanos, tz, mode), + Cast(Cast(dateAttr, TimestampType, tz, mode), ltzNanos, tz, mode)) + } + } + + test("rewrite is idempotent") { + val in = relation.select(Cast(ntzAttr, DateType).as("c")) + val once = Rewrite.execute(in) + comparePlans(Rewrite.execute(once), once) + } + + test("rewrite nanos(p) <-> DATE nested in an array") { + // ARRAY<nanos(p)> -> ARRAY<DATE> + checkRewrite( + Cast(arrNtzAttr, ArrayType(DateType, containsNull = true)), + Cast( + Cast(arrNtzAttr, ArrayType(TimestampNTZType, containsNull = true)), + ArrayType(DateType, containsNull = true))) + // ARRAY<DATE> -> ARRAY<nanos(p)> + checkRewrite( + Cast(arrDateAttr, ArrayType(ntzNanos, containsNull = true)), + Cast( + Cast(arrDateAttr, ArrayType(TimestampNTZType, containsNull = true)), + ArrayType(ntzNanos, containsNull = true))) + } + + test("rewrite nanos(p) -> DATE nested in a map value") { Review Comment: `bridgeType` also handles map keys (`keyBridge`), but the map tests only exercise the value side — one `MAP<nanos, x> -> MAP<date, x>` case would cover the key path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
