MaxGekk commented on code in PR #56739:
URL: https://github.com/apache/spark/pull/56739#discussion_r3472569444


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala:
##########
@@ -152,6 +152,73 @@ class ArrowWriterSuite extends SparkFunSuite {
     check(new YearUDT, Seq(2020, 2021, null, 2022))
   }
 
+  test("timestamp nanos round-trip") {
+    // Decompose an int64 epoch-nanoseconds value into the (epochMicros, 
nanosWithinMicro) pair,
+    // matching how the Arrow reader reconstructs it.
+    def fromEpochNanos(nanos: Long): TimestampNanosVal =
+      TimestampNanosVal.fromParts(Math.floorDiv(nanos, 1000L), 
Math.floorMod(nanos, 1000L).toShort)
+
+    val values = Seq(
+      TimestampNanosVal.fromParts(0L, 0.toShort),
+      TimestampNanosVal.fromParts(0L, 999.toShort),
+      TimestampNanosVal.fromParts(1234567L, 7.toShort),
+      // pre-epoch instant with a sub-microsecond remainder
+      TimestampNanosVal.fromParts(-1234567L, 13.toShort),
+      // large positive/negative epoch-nanoseconds within the representable 
range
+      fromEpochNanos(9000000000000000000L),
+      fromEpochNanos(-9000000000000000000L))
+
+    def check(dt: DataType, timeZoneId: String): Unit = {
+      val schema = new StructType().add("value", dt, nullable = true)
+      val writer = ArrowWriter.create(schema, timeZoneId)
+      assert(writer.schema === schema)
+      // Append a trailing null to exercise the null path.
+      (values.map(Option(_)) :+ None).foreach { v =>
+        writer.write(InternalRow(v.orNull))
+      }
+      writer.finish()
+
+      val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0))
+      values.zipWithIndex.foreach { case (v, rowId) =>
+        val got = dt match {
+          case _: TimestampNTZNanosType => reader.getTimestampNTZNanos(rowId)
+          case _: TimestampLTZNanosType => reader.getTimestampLTZNanos(rowId)
+        }
+        assert(got.epochMicros === v.epochMicros)
+        assert(got.nanosWithinMicro === v.nanosWithinMicro)
+      }
+      assert(reader.isNullAt(values.length))
+      writer.root.close()
+    }
+
+    check(TimestampNTZNanosType(9), null)
+    check(TimestampLTZNanosType(9), "UTC")
+    // The value path packs the full nanosecond value regardless of the column 
precision (precision
+    // is carried in the Arrow field metadata, not the value), so p=7 
round-trips identically to
+    // p=9; exercising it guards the value path against a future 
precision-enforcing change.
+    check(TimestampNTZNanosType(7), null)
+    check(TimestampLTZNanosType(7), "UTC")
+  }
+
+  test("timestamp nanos out of range raises DATETIME_OVERFLOW") {
+    def check(dt: DataType, timeZoneId: String): Unit = {
+      val schema = new StructType().add("value", dt, nullable = true)
+      val writer = ArrowWriter.create(schema, timeZoneId)
+      // epochMicros past the int64 epoch-nanosecond range overflows when 
packed, but still
+      // renders as a valid Instant/LocalDateTime in the error message.
+      val tooLarge = TimestampNanosVal.fromParts(Long.MaxValue / 1000L + 1L, 
0.toShort)
+      val e = intercept[SparkArithmeticException] {
+        writer.write(InternalRow(tooLarge))
+      }
+      assert(e.getCondition === "DATETIME_OVERFLOW")
+      assert(e.getMessage.contains("Arrow INT64"))
+      writer.root.close()
+    }
+
+    check(TimestampNTZNanosType(9), null)
+    check(TimestampLTZNanosType(9), "UTC")
+  }

Review Comment:
   Added a direct `DateTimeUtilsSuite` unit test for the shared 
`timestampNanosToEpochNanos` helper, covering the sub-micro boundaries 
(`0`/`999`), positive and pre-epoch values, the `floorDiv`/`floorMod` inverse 
the Arrow reader uses, the exact `Long.MaxValue` boundary, and the overflow 
`ArithmeticException`. Commit `3f3a3d4`.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala:
##########
@@ -86,6 +87,73 @@ class ArrowUtilsSuite extends SparkFunSuite {
     roundtripWithTz(LA.getId)
   }
 
+  test("timestamp nanos") {
+    // NTZ is zone-independent (null Arrow timezone); precision preserved via 
field metadata.
+    Seq(7, 8, 9).foreach { p =>
+      val schema = new StructType().add("value", TimestampNTZNanosType(p))
+      val arrowSchema = ArrowUtils.toArrowSchema(schema, null, true, false)
+      val fieldType = 
arrowSchema.findField("value").getType.asInstanceOf[ArrowType.Timestamp]
+      assert(fieldType.getUnit === TimeUnit.NANOSECOND)
+      assert(fieldType.getTimezone === null)
+      assert(ArrowUtils.fromArrowSchema(arrowSchema) === schema)
+    }
+
+    // LTZ is zone-aware: it requires a non-null session time zone; precision 
preserved.
+    def roundtripLtz(timeZoneId: String): Unit = {
+      Seq(7, 8, 9).foreach { p =>
+        val schema = new StructType().add("value", TimestampLTZNanosType(p))
+        val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId, true, 
false)
+        val fieldType = 
arrowSchema.findField("value").getType.asInstanceOf[ArrowType.Timestamp]
+        assert(fieldType.getUnit === TimeUnit.NANOSECOND)
+        assert(fieldType.getTimezone === timeZoneId)
+        assert(ArrowUtils.fromArrowSchema(arrowSchema) === schema)
+      }
+    }
+    roundtripLtz(ZoneId.systemDefault().getId)
+    roundtripLtz("Asia/Tokyo")
+    roundtripLtz("UTC")
+    roundtripLtz(LA.getId)
+
+    // LTZ without a time zone is an error, mirroring TimestampType.
+    checkError(
+      exception = intercept[SparkException] {
+        ArrowUtils.toArrowSchema(
+          new StructType().add("value", TimestampLTZNanosType(9)), null, true, 
false)
+      },
+      condition = "INTERNAL_ERROR",
+      parameters = Map("message" -> "Missing timezoneId where it is 
mandatory."))
+
+    // Fallback: a nanosecond Arrow timestamp without precision metadata maps 
to canonical p=9.
+    def nanosField(timeZoneId: String): Field = new Field(
+      "value",
+      new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, 
timeZoneId), null, null),
+      java.util.Collections.emptyList[Field]())
+    assert(ArrowUtils.fromArrowField(nanosField(null)) === 
TimestampNTZNanosType(9))
+    assert(ArrowUtils.fromArrowField(nanosField("UTC")) === 
TimestampLTZNanosType(9))
+
+    // Fallback also covers a present-but-invalid precision key (out of [7, 9] 
or non-numeric):
+    // the value is unusable, so the type maps to the canonical p=9 just like 
the no-metadata case.
+    def nanosFieldWithPrecision(timeZoneId: String, precision: String): Field 
= new Field(
+      "value",
+      new FieldType(
+        true,
+        new ArrowType.Timestamp(TimeUnit.NANOSECOND, timeZoneId),
+        null,
+        java.util.Collections.singletonMap("SPARK::timestampNanos::precision", 
precision)),
+      java.util.Collections.emptyList[Field]())
+    assert(
+      ArrowUtils.fromArrowField(nanosFieldWithPrecision(null, "5")) === 
TimestampNTZNanosType(9))
+    assert(
+      ArrowUtils.fromArrowField(nanosFieldWithPrecision("UTC", "x")) === 
TimestampLTZNanosType(9))
+
+    // The precision metadata key does not leak into the reconstructed column 
Metadata.
+    val md = new MetadataBuilder().putString("city", "beijing").build()
+    val schemaWithMeta =
+      new StructType().add("value", TimestampNTZNanosType(7), nullable = true, 
md)
+    assert(ArrowUtils.fromArrowSchema(
+      ArrowUtils.toArrowSchema(schemaWithMeta, null, true, false)) === 
schemaWithMeta)
+  }
+

Review Comment:
   Split this into the part that's testable now and the part that depends on 
the Connect wiring:
   
   - **Added now** (`faf1333`): an end-to-end `ArrowConverters` IPC batch 
round-trip in `ArrowConvertersSuite` (`toBatchIterator` -> `fromBatchIterator`) 
for both NTZ and LTZ, with sub-micro/pre-epoch values and a trailing null. This 
is the classic, `InternalRow`-level path, so it covers the full Arrow IPC 
serialize -> deserialize and `ColumnarBatch`/`ArrowColumnVector` read side 
end-to-end (strictly more than `ArrowWriterSuite`'s in-memory writer check).
   - **Deferred**: the DataFrame-level `Dataset.toArrowBatchRdd` e2e needs the 
Connect / DataFrame wiring for these types, so I've tracked it on SPARK-57160 
rather than forcing it into this PR.
   
   The PR description's "How was this patch tested?" section is updated to list 
the new cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to