MaxGekk commented on code in PR #56661:
URL: https://github.com/apache/spark/pull/56661#discussion_r3475520211
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOpsSuite.scala:
##########
@@ -129,6 +133,40 @@ class TimeTypeParquetOpsSuite extends SparkFunSuite {
// the TIME annotation; the raw-INT64 / TIMESTAMP / DECIMAL / group tests
// above already exercise the !isPrimitive and "non-TIME annotation"
branches.
+ // ---------- vectorized read updater ----------
+
+ private def timeColumn(unit: TimeUnit): ColumnDescriptor =
+ new ColumnDescriptor(
+ Array("c"),
+ Types.primitive(INT64,
REQUIRED).as(LogicalTypeAnnotation.timeType(false, unit)).named("c"),
+ 0, 0)
+
+ test("getVectorUpdater returns a framework updater for TimeType (micros and
nanos)") {
+
assert(TimeTypeParquetOps(timeMicros).getVectorUpdater(timeColumn(TimeUnit.MICROS)).isDefined)
+
assert(TimeTypeParquetOps(timeNanos).getVectorUpdater(timeColumn(TimeUnit.NANOS)).isDefined)
+ // Java-friendly companion entry point used by ParquetVectorUpdaterFactory.
+ assert(ParquetTypeOps.getVectorUpdaterOrNull(timeMicros,
timeColumn(TimeUnit.MICROS)) != null)
+ }
+
+ test("getVectorUpdater returns None for incompatible encodings (clean
reject, vectorized path)") {
+ val int32Millis = Types.primitive(INT32, REQUIRED)
+ .as(LogicalTypeAnnotation.timeType(false, TimeUnit.MILLIS)).named("c")
+ val rawInt64 = Types.primitive(INT64, REQUIRED).named("c")
+ val int64Timestamp = Types.primitive(INT64, REQUIRED)
+ .as(LogicalTypeAnnotation.timestampType(false,
TimeUnit.MICROS)).named("c")
+ // None -> the factory falls through to a clean
SchemaColumnConvertNotSupportedException,
+ // matching the row-path reject set (requireCompatibleParquetType),
instead of silently
+ // mis-decoding (e.g. readLongs over an INT32 column).
+ Seq(int32Millis, rawInt64, int64Timestamp).foreach { field =>
+ val descriptor = new ColumnDescriptor(Array("c"), field, 0, 0)
+
assert(TimeTypeParquetOps(timeMicros).getVectorUpdater(descriptor).isEmpty)
Review Comment:
This asserts the framework half of the reject (`getVectorUpdater` returns
`None`), but not the property the comment just above promises — that
`ParquetVectorUpdaterFactory.getUpdater(...)`, the entry point the vectorized
reader actually calls, *throws* `SchemaColumnConvertNotSupportedException` for
an incompatible TIME column. It does today (INT32/INT64-incompatible match no
arm and reach `throw constructConvertNotSupportedException`), but nothing
guards it: a future factory fall-through change (a broad `TimeType` arm or
catch-all) would reintroduce the silent mis-read this PR fixes, and the
`isEmpty` check wouldn't catch it.
Consider a factory-level reject test — `ParquetVectorUpdaterSuite` already
has the `newFactory(descriptor)` + `fac.getUpdater(...)` pattern (~L428):
```scala
intercept[SchemaColumnConvertNotSupportedException] {
newFactory(int32TimeMillisDescriptor).getUpdater(int32TimeMillisDescriptor,
TimeType(6))
}
```
for INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP. Non-blocking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]