MaxGekk commented on code in PR #56612:
URL: https://github.com/apache/spark/pull/56612#discussion_r3447493675


##########
sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala:
##########
@@ -450,6 +450,89 @@ abstract class TimestampNanosFunctionsSuiteBase extends 
SharedSparkSession {
     }
   }
 
+  // ===== max_by / min_by over nanosecond-precision timestamps (SPARK-56822) 
=====
+  // `MaxBy`/`MinBy` gate only on the ordering expression's orderability
+  // (`MaxMinBy.checkInputDataTypes` -> `TypeUtils.checkForOrderingExpr`), 
which the nanosecond
+  // types pass (SPARK-57103); the value expression is unrestricted and 
`dataType = valueExpr
+  // .dataType`, so a nanosecond *value* is returned with its precision 
preserved. No change to the
+  // aggregates is needed -- these tests lock in both the nanos-as-value and 
nanos-as-ordering paths.
+
+  test("SPARK-57103: max_by/min_by return a nanosecond value and preserve its 
precision") {
+    Seq(7, 8, 9).foreach { p =>
+      // Value columns are nanos; the ordering column is a plain int key (max 
at k=3, min at k=1).
+      // The sub-microsecond parts are multiples of 100ns, so they are exact 
at every p in [7, 9]
+      // (no flooring) yet still non-zero -- proving the nanos value survives, 
not truncated to micros.
+      val schema = new StructType()
+        .add("ntz", TimestampNTZNanosType(p))
+        .add("ltz", TimestampLTZNanosType(p))
+        .add("k", IntegerType)
+      val data = Seq(
+        Row(LocalDateTime.parse("2020-01-01T00:00:00.000000100"),
+          Instant.parse("2020-01-01T00:00:00.000000100Z"), 1),
+        Row(LocalDateTime.parse("2020-01-01T00:00:00.000000900"),
+          Instant.parse("2020-01-01T00:00:00.000000900Z"), 3),
+        Row(LocalDateTime.parse("2020-01-01T00:00:00.000000500"),
+          Instant.parse("2020-01-01T00:00:00.000000500Z"), 2))
+      val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema)
+      val res = df.selectExpr(
+        "max_by(ntz, k)", "min_by(ntz, k)", "max_by(ltz, k)", "min_by(ltz, k)")

Review Comment:
   This suite is for the nanos `functions.*` coverage, and its convention 
(header lines 30-34) is "Most tests use the SQL path; several also cross-check 
the Scala `Column` API" — the neighboring tests (`hour`/`minute`/`second`, the 
"match the functions.* Column API over nanos" test, `max`/`min`, 
`unix_timestamp`) all hit the Column API. These `max_by`/`min_by` tests use 
only the SQL string path, so the public `functions.max_by`/`min_by(Column, 
Column)` over nanos stays untested, and the SQL path here just duplicates the 
golden `.sql` queries this PR adds. `functions._` is already imported, so:
   ```suggestion
         val res = df.select(
           max_by(col("ntz"), col("k")), min_by(col("ntz"), col("k")),
           max_by(col("ltz"), col("k")), min_by(col("ltz"), col("k")))
   ```
   Same applies to the `selectExpr("max_by(label, ts)", ...)` at line 508 and 
the `expr("max_by(label, ts)")` GROUP BY agg at line 531 — use 
`max_by(col("label"), col("ts"))` there too (or cross-check both paths, as the 
sibling tests do).



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala:
##########
@@ -408,6 +408,20 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite 
with SQLHelper with Quer
     }
   }
 
+  test("SPARK-57103: max_by/min_by accept a nanosecond ordering and preserve 
the value type") {
+    // MaxBy/MinBy gate only on the ordering expression's orderability; a 
nanosecond ordering is an
+    // orderable AtomicType (SPARK-57103). The value expression is 
unrestricted, and the result type
+    // is the value's type, so a nanosecond value is returned with its 
precision preserved.
+    Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(7)).foreach { dt =>
+      val v = AttributeReference("v", dt)()
+      val ord = AttributeReference("ord", dt)()
+      assert(MaxBy(v, ord).checkInputDataTypes() == 
TypeCheckResult.TypeCheckSuccess)
+      assert(MinBy(v, ord).checkInputDataTypes() == 
TypeCheckResult.TypeCheckSuccess)
+      assert(MaxBy(v, ord).dataType == dt)
+      assert(MinBy(v, ord).dataType == dt)
+    }

Review Comment:
   Minor: `MaxBy(v, ord)` and `MinBy(v, ord)` are each constructed twice (once 
for `checkInputDataTypes`, once for `dataType`). Iterating over the two 
aggregates removes the double-construction and the repeated asserts:
   ```suggestion
       Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(7)).foreach { dt =>
         val v = AttributeReference("v", dt)()
         val ord = AttributeReference("ord", dt)()
         Seq(MaxBy(v, ord), MinBy(v, ord)).foreach { agg =>
           assert(agg.checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess)
           assert(agg.dataType == dt)
         }
       }
   ```
   (The suite's `assertSuccess` helper isn't a fit here — it analyzes 
`testRelation`, which has no nanos columns, and doesn't assert `dataType`, 
which is the point of this test.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to