cloud-fan commented on code in PR #56409:
URL: https://github.com/apache/spark/pull/56409#discussion_r3400015481


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimestampNanosCast.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Cast
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.CAST
+import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, 
StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, 
TimestampType}
+
+/**
+ * Rewrites casts between [[DateType]] and the nanosecond-precision timestamp 
types
+ * ([[TimestampLTZNanosType]] / [[TimestampNTZNanosType]]) into a two-step 
cast that goes through
+ * the corresponding microsecond-precision timestamp type:
+ *
+ *   - `nanos(p) -> DATE`  ==>  `nanos(p) -> micros -> DATE`
+ *   - `DATE -> nanos(p)`  ==>  `DATE -> micros -> nanos(p)`
+ *
+ * where the microsecond counterpart is `TIMESTAMP` for `*_LTZ` and 
`TIMESTAMP_NTZ` for `*_NTZ`.
+ * Both component casts already exist (`nanos(p) <-> micros` and `micros <-> 
DATE`), so no dedicated
+ * `DATE <-> nanos` conversion is needed in [[Cast]] and the semantics match 
the microsecond
+ * `DATE <-> TIMESTAMP` casts: the LTZ directions are resolved in the session 
time zone and the NTZ
+ * directions on the UTC wall-clock grid; sub-microsecond digits and the 
time-of-day are dropped
+ * when narrowing to `DATE`.
+ *
+ * The same rewrite applies when the `DATE <-> nanos` pair is nested inside 
complex types
+ * ([[ArrayType]] / [[MapType]] / [[StructType]]) at any depth, e.g.
+ * `ARRAY<nanos(p)> -> ARRAY<DATE>` becomes `ARRAY<nanos(p)> -> ARRAY<micros> 
-> ARRAY<DATE>`. The
+ * intermediate type mirrors the structure (and nullability) of the target 
type, with every
+ * `DATE <-> nanos` leaf swapped for the corresponding microsecond-precision 
timestamp type, so that
+ * both component casts pass `Cast.canCast`.
+ *
+ * `Cast.canCast` intentionally does not allow `DATE <-> nanos` directly (it 
recurses into complex
+ * types, so nested pairs are rejected too), so such a cast stays unresolved 
until this rule
+ * rewrites it into the resolvable nested form. The new casts inherit the 
original `timeZoneId` and
+ * `evalMode`; the only zone-sensitive part (the `micros <-> DATE` cast) gets 
its session time zone
+ * from [[ResolveTimeZone]] within the same fixed-point batch.
+ *
+ * The per-cast rewrite is exposed via [[rewriteDateNanosCast]] so that the 
single-pass resolver
+ * (see `TimezoneAwareExpressionResolver`) can apply the same transformation 
and produce an
+ * identical plan; otherwise single-pass resolution would fail the cast's 
input type check while
+ * fixed-point succeeds.
+ */
+object ResolveTimestampNanosCast extends Rule[LogicalPlan] {
+
+  /** The microsecond-precision timestamp counterpart of a 
nanosecond-precision timestamp type. */
+  private def microTimestampType(dt: DataType): Option[DataType] = dt match {
+    case _: TimestampLTZNanosType => Some(TimestampType)
+    case _: TimestampNTZNanosType => Some(TimestampNTZType)
+    case _ => None
+  }
+
+  /**
+   * Computes the intermediate ("bridge") type to route a `from -> to` cast 
through when the
+   * conversion involves a `DATE <-> nanos(p)` pair at any nesting depth. The 
bridge mirrors the
+   * structure (and nullability) of `to`, with every `DATE <-> nanos` leaf 
replaced by the
+   * corresponding microsecond-precision timestamp type. Returns `None` when 
no `DATE <-> nanos`
+   * pair is present, in which case [[Cast]] already handles the conversion 
directly.
+   */
+  private def bridgeType(from: DataType, to: DataType): Option[DataType] = 
(from, to) match {
+    // Scalar DATE <-> nanos(p): route through the microsecond counterpart of 
the nanos side.
+    case (DateType, _) => microTimestampType(to)
+    case (_, DateType) => microTimestampType(from)
+
+    case (ArrayType(fromEl, _), ArrayType(toEl, toContainsNull)) =>
+      bridgeType(fromEl, toEl).map(ArrayType(_, toContainsNull))
+
+    case (MapType(fromKey, fromVal, _), MapType(toKey, toVal, 
toValContainsNull)) =>
+      val keyBridge = bridgeType(fromKey, toKey)
+      val valBridge = bridgeType(fromVal, toVal)
+      if (keyBridge.isEmpty && valBridge.isEmpty) {
+        None
+      } else {
+        Some(MapType(keyBridge.getOrElse(toKey), valBridge.getOrElse(toVal), 
toValContainsNull))
+      }
+
+    case (StructType(fromFields), StructType(toFields))
+        if fromFields.length == toFields.length =>
+      val fieldBridges = fromFields.zip(toFields).map {
+        case (fromField, toField) => bridgeType(fromField.dataType, 
toField.dataType)
+      }
+      if (fieldBridges.forall(_.isEmpty)) {
+        None
+      } else {
+        Some(StructType(toFields.zip(fieldBridges).map {
+          case (toField, bridge) => bridge.map(bt => toField.copy(dataType = 
bt)).getOrElse(toField)
+        }))
+      }
+
+    case _ => None
+  }
+
+  /**
+   * If `cast` converts (recursively, through complex types) between 
[[DateType]] and a
+   * nanosecond-precision timestamp type, returns the equivalent two-step cast 
routed through the
+   * corresponding microsecond-precision timestamp type; returns `None` for 
any other cast. The
+   * nested casts inherit the original `timeZoneId` and `evalMode`.
+   */
+  def rewriteDateNanosCast(cast: Cast): Option[Cast] = cast match {
+    case Cast(child, to, tz, mode) if child.resolved =>
+      bridgeType(child.dataType, to).map { micros =>

Review Comment:
   The rewrite fires even when one of the rewritten legs is itself invalid, so 
the DATATYPE_MISMATCH error ends up naming the internal bridge type the user 
never wrote. Two repro shapes: `CAST(struct<a: nanos, b: binary> AS struct<a: 
date, b: int>)` — `bridgeType` fires on field `a`, but the `b` leg is 
uncastable, so the error reports `struct<a: timestamp_ntz, b: int>`; and 
(DataFrame API) `ARRAY<DATE> -> ARRAY<nanos(9)>` with `containsNull = false` on 
both sides — `forceNullable(DATE, TIMESTAMP_NTZ)` is true, so the inner leg 
can't resolve.
   
   Suggest returning `None` unless both legs pass the evalMode-appropriate 
capability check (the same `canCast`/`canAnsiCast`/`canTryCast` dispatch as 
`Cast.checkInputDataTypes`), so an invalid cast keeps failing in terms of the 
user's own types.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimezoneAwareExpressionResolver.scala:
##########
@@ -45,7 +46,9 @@ class TimezoneAwareExpressionResolver(expressionResolver: 
ExpressionResolver)
   /**
    * Resolves a [[TimeZoneAwareExpression]] by resolving its children, 
applying a timezone
    * and calling [[coerceExpressionTypes]] on the result. If the expression is 
a [[Cast]], we apply
-   * [[collapseCast]] to the result.
+   * [[collapseCast]] to the result, and rewrite a `DATE <-> nanos` cast 
through the microsecond

Review Comment:
   The method is `tryCollapseCast` — `[[collapseCast]]` doesn't resolve 
(pre-existing wrong name, but this line is being rewritten anyway):
   ```suggestion
      * [[tryCollapseCast]] to the result, and rewrite a `DATE <-> nanos` cast 
through the microsecond
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimestampNanosCast.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Cast
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.CAST
+import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, 
StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, 
TimestampType}
+
+/**
+ * Rewrites casts between [[DateType]] and the nanosecond-precision timestamp 
types
+ * ([[TimestampLTZNanosType]] / [[TimestampNTZNanosType]]) into a two-step 
cast that goes through
+ * the corresponding microsecond-precision timestamp type:
+ *
+ *   - `nanos(p) -> DATE`  ==>  `nanos(p) -> micros -> DATE`
+ *   - `DATE -> nanos(p)`  ==>  `DATE -> micros -> nanos(p)`
+ *
+ * where the microsecond counterpart is `TIMESTAMP` for `*_LTZ` and 
`TIMESTAMP_NTZ` for `*_NTZ`.
+ * Both component casts already exist (`nanos(p) <-> micros` and `micros <-> 
DATE`), so no dedicated
+ * `DATE <-> nanos` conversion is needed in [[Cast]] and the semantics match 
the microsecond
+ * `DATE <-> TIMESTAMP` casts: the LTZ directions are resolved in the session 
time zone and the NTZ
+ * directions on the UTC wall-clock grid; sub-microsecond digits and the 
time-of-day are dropped
+ * when narrowing to `DATE`.
+ *
+ * The same rewrite applies when the `DATE <-> nanos` pair is nested inside 
complex types
+ * ([[ArrayType]] / [[MapType]] / [[StructType]]) at any depth, e.g.
+ * `ARRAY<nanos(p)> -> ARRAY<DATE>` becomes `ARRAY<nanos(p)> -> ARRAY<micros> 
-> ARRAY<DATE>`. The
+ * intermediate type mirrors the structure (and nullability) of the target 
type, with every
+ * `DATE <-> nanos` leaf swapped for the corresponding microsecond-precision 
timestamp type, so that
+ * both component casts pass `Cast.canCast`.
+ *
+ * `Cast.canCast` intentionally does not allow `DATE <-> nanos` directly (it 
recurses into complex
+ * types, so nested pairs are rejected too), so such a cast stays unresolved 
until this rule
+ * rewrites it into the resolvable nested form. The new casts inherit the 
original `timeZoneId` and
+ * `evalMode`; the only zone-sensitive part (the `micros <-> DATE` cast) gets 
its session time zone
+ * from [[ResolveTimeZone]] within the same fixed-point batch.
+ *
+ * The per-cast rewrite is exposed via [[rewriteDateNanosCast]] so that the 
single-pass resolver
+ * (see `TimezoneAwareExpressionResolver`) can apply the same transformation 
and produce an
+ * identical plan; otherwise single-pass resolution would fail the cast's 
input type check while
+ * fixed-point succeeds.
+ */
+object ResolveTimestampNanosCast extends Rule[LogicalPlan] {

Review Comment:
   On the open rule-vs-#56375 question (from the thread proposing reuse for 
`TIME` and `LTZ <-> NTZ`): I'd take the direct-Cast approach (#56375), for two 
reasons.
   
   1. The rule's coordination costs showed up within this PR's own review: the 
single-pass mirror and the complex-type recursion in `bridgeType` each 
re-implement something `Cast` provides for free, and both were initially 
missed; the `USER_SPECIFIED_CAST` single-pass drop is still open. Every future 
bridged pair pays the same. There are also standing costs: `Cast.canCast` 
returns false for a cast SQL supports, EXPLAIN shows double casts, and a `Cast` 
constructed outside the analyzer never gets the rewrite.
   2. The reuse argument caps out. The bridge needs an exact micros 
intermediate — fine for `DATE`, and for `TIME` (its max precision is 6, so 
sub-micro loss is target-mandated). But `TIMESTAMP_NTZ(p) <-> TIMESTAMP_LTZ(p)` 
can't route through micros without silently truncating sub-microsecond digits 
the target can represent — zone conversion shifts whole seconds, so a direct 
conversion preserves them. That pair will need direct `Cast` arms regardless, 
leaving the codebase with both mechanisms.
   
   The direct arms are one-line compositions of the same utilities the micros 
casts use (`daysToMicros`/`microsToDays` + `TimestampNanosVal.fromParts`), and 
this PR's test matrix pins micros-parity either way. Suggest reviving #56375 
and folding in this PR's `canANSIStoreAssign`/`canUpCast` blocks and its test 
coverage (complex types, golden files, dual-run).



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimestampNanosCastSuite.scala:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.time.{LocalDate, LocalDateTime}
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast, EvalMode, Expression, ExpressionEvalHelper, Literal, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, DateType, IntegerType, 
MapType, StringType, StructField, StructType, TimestampLTZNanosType, 
TimestampNTZNanosType, TimestampNTZType, TimestampType}
+import org.apache.spark.unsafe.types.TimestampNanosVal
+
+/**
+ * Test suite for [[ResolveTimestampNanosCast]], which rewrites `DATE <-> 
nanos(p)` casts into a
+ * two-step cast through the microsecond timestamp type.
+ */
+class ResolveTimestampNanosCastSuite extends AnalysisTest with 
ExpressionEvalHelper {
+
+  private val ntzNanos = 
TimestampNTZNanosType(TimestampNTZNanosType.MAX_PRECISION)
+  private val ltzNanos = 
TimestampLTZNanosType(TimestampLTZNanosType.MAX_PRECISION)
+
+  private val ntzAttr = AttributeReference("ntz", ntzNanos)()
+  private val ltzAttr = AttributeReference("ltz", ltzNanos)()
+  private val dateAttr = AttributeReference("d", DateType)()
+
+  // Complex-typed inputs that nest a nanos timestamp at various depths.
+  private val arrNtzAttr = AttributeReference("arr_ntz", ArrayType(ntzNanos, 
containsNull = true))()
+  private val arrDateAttr = AttributeReference("arr_d", ArrayType(DateType, 
containsNull = true))()
+  private val mapNtzAttr =
+    AttributeReference("map_ntz", MapType(StringType, ntzNanos, 
valueContainsNull = true))()
+  private val structNtzAttr =
+    AttributeReference("st_ntz", StructType(Seq(StructField("f", ntzNanos))))()
+  private val structMixedAttr = AttributeReference(
+    "st_mixed",
+    StructType(Seq(StructField("a", ntzNanos), StructField("b", 
IntegerType))))()
+  private val arrStructNtzAttr = AttributeReference(
+    "arr_st_ntz",
+    ArrayType(StructType(Seq(StructField("f", ntzNanos))), containsNull = 
true))()
+
+  private val relation = LocalRelation(
+    ntzAttr,
+    ltzAttr,
+    dateAttr,
+    arrNtzAttr,
+    arrDateAttr,
+    mapNtzAttr,
+    structNtzAttr,
+    structMixedAttr,
+    arrStructNtzAttr)
+
+  // Rewrite only: keeps the original time zone id so the structure can be 
compared exactly.
+  private object Rewrite extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("rewrite", FixedPoint(10), ResolveTimestampNanosCast) 
:: Nil
+  }
+
+  // Rewrite + time zone assignment, used to obtain a fully resolved, 
evaluable expression.
+  private object Analyze extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("analyze", FixedPoint(10), ResolveTimeZone, 
ResolveTimestampNanosCast) :: Nil
+  }
+
+  private def micro(dt: DataType): DataType = dt match {
+    case _: TimestampLTZNanosType => TimestampType
+    case _: TimestampNTZNanosType => TimestampNTZType
+  }
+
+  private def checkRewrite(in: Expression, out: Expression): Unit = {
+    comparePlans(Rewrite.execute(relation.select(in.as("c"))), 
relation.select(out.as("c")))
+  }
+
+  private def analyzeExpr(e: Expression): Expression = {
+    Analyze.execute(OneRowRelation().select(e.as("c")))
+      .asInstanceOf[Project].projectList.head.asInstanceOf[Alias].child
+  }
+
+  test("rewrite nanos(p) -> DATE through the microsecond timestamp type") {
+    Seq(ntzAttr, ltzAttr).foreach { attr =>
+      checkRewrite(
+        Cast(attr, DateType),
+        Cast(Cast(attr, micro(attr.dataType)), DateType))
+    }
+  }
+
+  test("rewrite DATE -> nanos(p) through the microsecond timestamp type") {
+    Seq(ntzNanos, ltzNanos).foreach { nanos =>
+      checkRewrite(
+        Cast(dateAttr, nanos),
+        Cast(Cast(dateAttr, micro(nanos)), nanos))
+    }
+  }
+
+  test("rewrite preserves timeZoneId and evalMode") {
+    val tz = Option(LA.getId)
+    Seq(EvalMode.LEGACY, EvalMode.ANSI, EvalMode.TRY).foreach { mode =>
+      // nanos(p) -> DATE
+      checkRewrite(
+        Cast(ltzAttr, DateType, tz, mode),
+        Cast(Cast(ltzAttr, TimestampType, tz, mode), DateType, tz, mode))
+      // DATE -> nanos(p)
+      checkRewrite(
+        Cast(dateAttr, ltzNanos, tz, mode),
+        Cast(Cast(dateAttr, TimestampType, tz, mode), ltzNanos, tz, mode))
+    }
+  }
+
+  test("rewrite is idempotent") {
+    val in = relation.select(Cast(ntzAttr, DateType).as("c"))
+    val once = Rewrite.execute(in)
+    comparePlans(Rewrite.execute(once), once)
+  }
+
+  test("rewrite nanos(p) <-> DATE nested in an array") {
+    // ARRAY<nanos(p)> -> ARRAY<DATE>
+    checkRewrite(
+      Cast(arrNtzAttr, ArrayType(DateType, containsNull = true)),
+      Cast(
+        Cast(arrNtzAttr, ArrayType(TimestampNTZType, containsNull = true)),
+        ArrayType(DateType, containsNull = true)))
+    // ARRAY<DATE> -> ARRAY<nanos(p)>
+    checkRewrite(
+      Cast(arrDateAttr, ArrayType(ntzNanos, containsNull = true)),
+      Cast(
+        Cast(arrDateAttr, ArrayType(TimestampNTZType, containsNull = true)),
+        ArrayType(ntzNanos, containsNull = true)))
+  }
+
+  test("rewrite nanos(p) -> DATE nested in a map value") {

Review Comment:
   `bridgeType` also handles map keys (`keyBridge`), but the map tests only 
exercise the value side — one `MAP<nanos, x> -> MAP<date, x>` case would cover 
the key path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to