MaxGekk commented on code in PR #56739:
URL: https://github.com/apache/spark/pull/56739#discussion_r3468300520


##########
sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala:
##########
@@ -125,6 +130,24 @@ private[sql] object ArrowUtils {
     }
   }
 
+  /**
+   * Builds an Arrow field for a nanosecond timestamp type, stashing the 
column precision in the
+   * field metadata (alongside the user metadata) so it can be recovered in 
`fromArrowField`.
+   */
+  private def toTimestampNanosArrowField(

Review Comment:
   Good catch, and agreed this is a coordination item rather than something to 
fix in this PR's code.
   
   One thing to flag for whoever wires it up: adding the two nanos cases to 
`isSupportedByArrow` alone isn't sufficient. In #56334 the Arrow-cache 
**reader** dispatch also omits them (`case LongType | TimestampType | 
TimestampNTZType | _: DayTimeIntervalType | _: TimeType => ...`, falling to 
`case _ =>`), as do the column-stats and min/max paths. Flipping only the gate 
to `true` without those would turn today's silent fallback into a hard failure 
- so nanos needs to be handled end-to-end (gate + reader + stats) in the same 
change.
   
   Since #56334 owns both the method and the reader, I think the cleanest place 
for the full nanos coverage is there (or a follow-up once both land), rather 
than a partial gate change here. The nanos types are also still feature-gated 
behind `spark.sql.timestampNanosTypes.enabled` and unreleased, so there's no 
rush. I'll leave a note to track it.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala:
##########
@@ -152,6 +152,68 @@ class ArrowWriterSuite extends SparkFunSuite {
     check(new YearUDT, Seq(2020, 2021, null, 2022))
   }
 
+  test("timestamp nanos round-trip") {
+    // Decompose an int64 epoch-nanoseconds value into the (epochMicros, 
nanosWithinMicro) pair,
+    // matching how the Arrow reader reconstructs it.
+    def fromEpochNanos(nanos: Long): TimestampNanosVal =
+      TimestampNanosVal.fromParts(Math.floorDiv(nanos, 1000L), 
Math.floorMod(nanos, 1000L).toShort)
+
+    val values = Seq(
+      TimestampNanosVal.fromParts(0L, 0.toShort),
+      TimestampNanosVal.fromParts(0L, 999.toShort),
+      TimestampNanosVal.fromParts(1234567L, 7.toShort),
+      // pre-epoch instant with a sub-microsecond remainder
+      TimestampNanosVal.fromParts(-1234567L, 13.toShort),
+      // large positive/negative epoch-nanoseconds within the representable 
range
+      fromEpochNanos(9000000000000000000L),
+      fromEpochNanos(-9000000000000000000L))
+
+    def check(dt: DataType, timeZoneId: String): Unit = {
+      val schema = new StructType().add("value", dt, nullable = true)
+      val writer = ArrowWriter.create(schema, timeZoneId)
+      assert(writer.schema === schema)
+      // Append a trailing null to exercise the null path.
+      (values.map(Option(_)) :+ None).foreach { v =>
+        writer.write(InternalRow(v.orNull))
+      }
+      writer.finish()
+
+      val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0))
+      values.zipWithIndex.foreach { case (v, rowId) =>
+        val got = dt match {
+          case _: TimestampNTZNanosType => reader.getTimestampNTZNanos(rowId)
+          case _: TimestampLTZNanosType => reader.getTimestampLTZNanos(rowId)
+        }
+        assert(got.epochMicros === v.epochMicros)
+        assert(got.nanosWithinMicro === v.nanosWithinMicro)
+      }
+      assert(reader.isNullAt(values.length))
+      writer.root.close()
+    }
+
+    check(TimestampNTZNanosType(9), null)
+    check(TimestampLTZNanosType(9), "UTC")
+  }
+
+  test("timestamp nanos out of range raises DATETIME_OVERFLOW") {
+    def check(dt: DataType, timeZoneId: String): Unit = {
+      val schema = new StructType().add("value", dt, nullable = true)
+      val writer = ArrowWriter.create(schema, timeZoneId)
+      // epochMicros past the int64 epoch-nanosecond range overflows when 
packed, but still
+      // renders as a valid Instant/LocalDateTime in the error message.
+      val tooLarge = TimestampNanosVal.fromParts(Long.MaxValue / 1000L + 1L, 
0.toShort)
+      val e = intercept[SparkArithmeticException] {
+        writer.write(InternalRow(tooLarge))
+      }
+      assert(e.getCondition === "DATETIME_OVERFLOW")
+      assert(e.getMessage.contains("Arrow INT64"))
+      writer.root.close()
+    }
+
+    check(TimestampNTZNanosType(9), null)
+    check(TimestampLTZNanosType(9), "UTC")
+  }
+

Review Comment:
   Thanks - agreed. I've added a `p=7` value round-trip case for both NTZ and 
LTZ. The value path packs the full nanosecond value regardless of the column 
precision (precision is carried in the Arrow field metadata, not the value), so 
`p=7` round-trips identically to `p=9` today, but exercising it guards the 
value path against a future precision-enforcing change. I'll push it once CI on 
the current commit completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to