MaxGekk commented on code in PR #56739:
URL: https://github.com/apache/spark/pull/56739#discussion_r3472569444
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala:
##########
@@ -152,6 +152,73 @@ class ArrowWriterSuite extends SparkFunSuite {
check(new YearUDT, Seq(2020, 2021, null, 2022))
}
+ test("timestamp nanos round-trip") {
+ // Decompose an int64 epoch-nanoseconds value into the (epochMicros,
nanosWithinMicro) pair,
+ // matching how the Arrow reader reconstructs it.
+ def fromEpochNanos(nanos: Long): TimestampNanosVal =
+ TimestampNanosVal.fromParts(Math.floorDiv(nanos, 1000L),
Math.floorMod(nanos, 1000L).toShort)
+
+ val values = Seq(
+ TimestampNanosVal.fromParts(0L, 0.toShort),
+ TimestampNanosVal.fromParts(0L, 999.toShort),
+ TimestampNanosVal.fromParts(1234567L, 7.toShort),
+ // pre-epoch instant with a sub-microsecond remainder
+ TimestampNanosVal.fromParts(-1234567L, 13.toShort),
+ // large positive/negative epoch-nanoseconds within the representable
range
+ fromEpochNanos(9000000000000000000L),
+ fromEpochNanos(-9000000000000000000L))
+
+ def check(dt: DataType, timeZoneId: String): Unit = {
+ val schema = new StructType().add("value", dt, nullable = true)
+ val writer = ArrowWriter.create(schema, timeZoneId)
+ assert(writer.schema === schema)
+ // Append a trailing null to exercise the null path.
+ (values.map(Option(_)) :+ None).foreach { v =>
+ writer.write(InternalRow(v.orNull))
+ }
+ writer.finish()
+
+ val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0))
+ values.zipWithIndex.foreach { case (v, rowId) =>
+ val got = dt match {
+ case _: TimestampNTZNanosType => reader.getTimestampNTZNanos(rowId)
+ case _: TimestampLTZNanosType => reader.getTimestampLTZNanos(rowId)
+ }
+ assert(got.epochMicros === v.epochMicros)
+ assert(got.nanosWithinMicro === v.nanosWithinMicro)
+ }
+ assert(reader.isNullAt(values.length))
+ writer.root.close()
+ }
+
+ check(TimestampNTZNanosType(9), null)
+ check(TimestampLTZNanosType(9), "UTC")
+ // The value path packs the full nanosecond value regardless of the column
precision (precision
+ // is carried in the Arrow field metadata, not the value), so p=7
round-trips identically to
+ // p=9; exercising it guards the value path against a future
precision-enforcing change.
+ check(TimestampNTZNanosType(7), null)
+ check(TimestampLTZNanosType(7), "UTC")
+ }
+
+ test("timestamp nanos out of range raises DATETIME_OVERFLOW") {
+ def check(dt: DataType, timeZoneId: String): Unit = {
+ val schema = new StructType().add("value", dt, nullable = true)
+ val writer = ArrowWriter.create(schema, timeZoneId)
+ // epochMicros past the int64 epoch-nanosecond range overflows when
packed, but still
+ // renders as a valid Instant/LocalDateTime in the error message.
+ val tooLarge = TimestampNanosVal.fromParts(Long.MaxValue / 1000L + 1L,
0.toShort)
+ val e = intercept[SparkArithmeticException] {
+ writer.write(InternalRow(tooLarge))
+ }
+ assert(e.getCondition === "DATETIME_OVERFLOW")
+ assert(e.getMessage.contains("Arrow INT64"))
+ writer.root.close()
+ }
+
+ check(TimestampNTZNanosType(9), null)
+ check(TimestampLTZNanosType(9), "UTC")
+ }
Review Comment:
Added a direct `DateTimeUtilsSuite` unit test for the shared
`timestampNanosToEpochNanos` helper, covering the sub-micro boundaries
(`0`/`999`), positive and pre-epoch values, the `floorDiv`/`floorMod` inverse
the Arrow reader uses, the exact `Long.MaxValue` boundary, and the overflow
`ArithmeticException`. Commit `3f3a3d4`.
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala:
##########
@@ -86,6 +87,73 @@ class ArrowUtilsSuite extends SparkFunSuite {
roundtripWithTz(LA.getId)
}
+ test("timestamp nanos") {
+ // NTZ is zone-independent (null Arrow timezone); precision preserved via
field metadata.
+ Seq(7, 8, 9).foreach { p =>
+ val schema = new StructType().add("value", TimestampNTZNanosType(p))
+ val arrowSchema = ArrowUtils.toArrowSchema(schema, null, true, false)
+ val fieldType =
arrowSchema.findField("value").getType.asInstanceOf[ArrowType.Timestamp]
+ assert(fieldType.getUnit === TimeUnit.NANOSECOND)
+ assert(fieldType.getTimezone === null)
+ assert(ArrowUtils.fromArrowSchema(arrowSchema) === schema)
+ }
+
+ // LTZ is zone-aware: it requires a non-null session time zone; precision
preserved.
+ def roundtripLtz(timeZoneId: String): Unit = {
+ Seq(7, 8, 9).foreach { p =>
+ val schema = new StructType().add("value", TimestampLTZNanosType(p))
+ val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId, true,
false)
+ val fieldType =
arrowSchema.findField("value").getType.asInstanceOf[ArrowType.Timestamp]
+ assert(fieldType.getUnit === TimeUnit.NANOSECOND)
+ assert(fieldType.getTimezone === timeZoneId)
+ assert(ArrowUtils.fromArrowSchema(arrowSchema) === schema)
+ }
+ }
+ roundtripLtz(ZoneId.systemDefault().getId)
+ roundtripLtz("Asia/Tokyo")
+ roundtripLtz("UTC")
+ roundtripLtz(LA.getId)
+
+ // LTZ without a time zone is an error, mirroring TimestampType.
+ checkError(
+ exception = intercept[SparkException] {
+ ArrowUtils.toArrowSchema(
+ new StructType().add("value", TimestampLTZNanosType(9)), null, true,
false)
+ },
+ condition = "INTERNAL_ERROR",
+ parameters = Map("message" -> "Missing timezoneId where it is
mandatory."))
+
+ // Fallback: a nanosecond Arrow timestamp without precision metadata maps
to canonical p=9.
+ def nanosField(timeZoneId: String): Field = new Field(
+ "value",
+ new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND,
timeZoneId), null, null),
+ java.util.Collections.emptyList[Field]())
+ assert(ArrowUtils.fromArrowField(nanosField(null)) ===
TimestampNTZNanosType(9))
+ assert(ArrowUtils.fromArrowField(nanosField("UTC")) ===
TimestampLTZNanosType(9))
+
+ // Fallback also covers a present-but-invalid precision key (out of [7, 9]
or non-numeric):
+ // the value is unusable, so the type maps to the canonical p=9 just like
the no-metadata case.
+ def nanosFieldWithPrecision(timeZoneId: String, precision: String): Field
= new Field(
+ "value",
+ new FieldType(
+ true,
+ new ArrowType.Timestamp(TimeUnit.NANOSECOND, timeZoneId),
+ null,
+ java.util.Collections.singletonMap("SPARK::timestampNanos::precision",
precision)),
+ java.util.Collections.emptyList[Field]())
+ assert(
+ ArrowUtils.fromArrowField(nanosFieldWithPrecision(null, "5")) ===
TimestampNTZNanosType(9))
+ assert(
+ ArrowUtils.fromArrowField(nanosFieldWithPrecision("UTC", "x")) ===
TimestampLTZNanosType(9))
+
+ // The precision metadata key does not leak into the reconstructed column
Metadata.
+ val md = new MetadataBuilder().putString("city", "beijing").build()
+ val schemaWithMeta =
+ new StructType().add("value", TimestampNTZNanosType(7), nullable = true,
md)
+ assert(ArrowUtils.fromArrowSchema(
+ ArrowUtils.toArrowSchema(schemaWithMeta, null, true, false)) ===
schemaWithMeta)
+ }
+
Review Comment:
Split this into the part that's testable now and the part that depends on
the Connect wiring:
- **Added now** (`faf1333`): an end-to-end `ArrowConverters` IPC batch
round-trip in `ArrowConvertersSuite` (`toBatchIterator` -> `fromBatchIterator`)
for both NTZ and LTZ, with sub-micro/pre-epoch values and a trailing null. This
is the classic, `InternalRow`-level path, so it covers the full Arrow IPC
serialize -> deserialize and `ColumnarBatch`/`ArrowColumnVector` read side
end-to-end (strictly more than `ArrowWriterSuite`'s in-memory writer check).
- **Deferred**: the DataFrame-level `Dataset.toArrowBatchRdd` e2e needs the
Connect / DataFrame wiring for these types, so I've tracked it on SPARK-57160
rather than forcing it into this PR.
The PR description's "How was this patch tested?" section is updated to list
the new cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]