MaxGekk commented on code in PR #56612:
URL: https://github.com/apache/spark/pull/56612#discussion_r3447493675
##########
sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala:
##########
@@ -450,6 +450,89 @@ abstract class TimestampNanosFunctionsSuiteBase extends
SharedSparkSession {
}
}
+ // ===== max_by / min_by over nanosecond-precision timestamps (SPARK-56822)
=====
+ // `MaxBy`/`MinBy` gate only on the ordering expression's orderability
+ // (`MaxMinBy.checkInputDataTypes` -> `TypeUtils.checkForOrderingExpr`),
which the nanosecond
+ // types pass (SPARK-57103); the value expression is unrestricted and
`dataType = valueExpr
+ // .dataType`, so a nanosecond *value* is returned with its precision
preserved. No change to the
+ // aggregates is needed -- these tests lock in both the nanos-as-value and
nanos-as-ordering paths.
+
+ test("SPARK-57103: max_by/min_by return a nanosecond value and preserve its
precision") {
+ Seq(7, 8, 9).foreach { p =>
+ // Value columns are nanos; the ordering column is a plain int key (max
at k=3, min at k=1).
+ // The sub-microsecond parts are multiples of 100ns, so they are exact
at every p in [7, 9]
+ // (no flooring) yet still non-zero -- proving the nanos value survives,
not truncated to micros.
+ val schema = new StructType()
+ .add("ntz", TimestampNTZNanosType(p))
+ .add("ltz", TimestampLTZNanosType(p))
+ .add("k", IntegerType)
+ val data = Seq(
+ Row(LocalDateTime.parse("2020-01-01T00:00:00.000000100"),
+ Instant.parse("2020-01-01T00:00:00.000000100Z"), 1),
+ Row(LocalDateTime.parse("2020-01-01T00:00:00.000000900"),
+ Instant.parse("2020-01-01T00:00:00.000000900Z"), 3),
+ Row(LocalDateTime.parse("2020-01-01T00:00:00.000000500"),
+ Instant.parse("2020-01-01T00:00:00.000000500Z"), 2))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
+ val res = df.selectExpr(
+ "max_by(ntz, k)", "min_by(ntz, k)", "max_by(ltz, k)", "min_by(ltz, k)")
Review Comment:
This suite is for the nanos `functions.*` coverage, and its convention
(header lines 30-34) is "Most tests use the SQL path; several also cross-check
the Scala `Column` API" — the neighboring tests (`hour`/`minute`/`second`, the
"match the functions.* Column API over nanos" test, `max`/`min`,
`unix_timestamp`) all hit the Column API. These `max_by`/`min_by` tests use
only the SQL string path, so the public `functions.max_by`/`min_by(Column,
Column)` over nanos stays untested, and the SQL path here just duplicates the
golden `.sql` queries this PR adds. `functions._` is already imported, so:
```suggestion
val res = df.select(
max_by(col("ntz"), col("k")), min_by(col("ntz"), col("k")),
max_by(col("ltz"), col("k")), min_by(col("ltz"), col("k")))
```
Same applies to the `selectExpr("max_by(label, ts)", ...)` at line 508 and
the `expr("max_by(label, ts)")` GROUP BY agg at line 531 — use
`max_by(col("label"), col("ts"))` there too (or cross-check both paths, as the
sibling tests do).
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala:
##########
@@ -408,6 +408,20 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite
with SQLHelper with Quer
}
}
+ test("SPARK-57103: max_by/min_by accept a nanosecond ordering and preserve
the value type") {
+ // MaxBy/MinBy gate only on the ordering expression's orderability; a
nanosecond ordering is an
+ // orderable AtomicType (SPARK-57103). The value expression is
unrestricted, and the result type
+ // is the value's type, so a nanosecond value is returned with its
precision preserved.
+ Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(7)).foreach { dt =>
+ val v = AttributeReference("v", dt)()
+ val ord = AttributeReference("ord", dt)()
+ assert(MaxBy(v, ord).checkInputDataTypes() ==
TypeCheckResult.TypeCheckSuccess)
+ assert(MinBy(v, ord).checkInputDataTypes() ==
TypeCheckResult.TypeCheckSuccess)
+ assert(MaxBy(v, ord).dataType == dt)
+ assert(MinBy(v, ord).dataType == dt)
+ }
Review Comment:
Minor: `MaxBy(v, ord)` and `MinBy(v, ord)` are each constructed twice (once
for `checkInputDataTypes`, once for `dataType`). Iterating over the two
aggregates removes the double-construction and the repeated asserts:
```suggestion
Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(7)).foreach { dt =>
val v = AttributeReference("v", dt)()
val ord = AttributeReference("ord", dt)()
Seq(MaxBy(v, ord), MinBy(v, ord)).foreach { agg =>
assert(agg.checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess)
assert(agg.dataType == dt)
}
}
```
(The suite's `assertSuccess` helper isn't a fit here — it analyzes
`testRelation`, which has no nanos columns, and doesn't assert `dataType`,
which is the point of this test.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]