cloud-fan commented on code in PR #54297:
URL: https://github.com/apache/spark/pull/54297#discussion_r2862085077


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -592,6 +594,159 @@ case class ListAgg(
     false
   }
 
+  /**
+   * Returns true if the order value may be ambiguous after DISTINCT 
deduplication.
+   *
+   * For LISTAGG(DISTINCT child) WITHIN GROUP (ORDER BY order_expr), 
correctness requires
+   * a functional dependency (child -> order_expr, where equality is defined 
by GROUP BY
+   * semantics): each distinct child value must map to exactly one order 
value. Otherwise,
+   * after deduplication on child, the order value is ambiguous.
+   *
+   * When child = Cast(order_expr, T), child -> order_expr is trivially 
satisfied since Cast is
+   * injective. However, an additional condition is needed: the cast must be
+   * equality-preserving (order_expr -> child), meaning GROUP BY-equal order 
values must produce
+   * GROUP BY-equal child values. Otherwise, the DISTINCT rewrite (which 
groups by child) may split
+   * values that should be in the same group, causing over-counting.
+   * For example, Float/Double violate this because -0.0 and 0.0 are GROUP 
BY-equal but cast
+   * to different strings. This is what [[isCastEqualityPreserving]] checks.
+   *
+   * Currently only detects these conditions for Cast.
+   * TODO(SPARK-55718): extend to detect other functional dependencies.
+   *
+   * Returns false when the order expression matches the child (i.e., 
[[needSaveOrderValue]]
+   * is false). Otherwise, the behavior depends on the
+   * [[SQLConf.LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER]] config:
+   *  - If enabled, delegates to [[checkOrderValueDeterminism]] to determine 
whether the
+   *    order value is uniquely determined by the child.
+   *  - If disabled, any mismatch is considered ambiguous.
+   *
+   * @return true if ambiguity exists, false if the order value is 
deterministic
+   * @see [[throwDistinctOrderError]] to throw the appropriate error when this 
returns true
+   */
+  def hasDistinctOrderAmbiguity: Boolean = {
+    needSaveOrderValue && {
+      if (SQLConf.get.listaggAllowDistinctCastWithOrder) {
+        checkOrderValueDeterminism match {
+          case OrderDeterminismResult.Deterministic => false
+          case _ => true
+        }
+      } else {
+        true
+      }
+    }
+  }
+
+  def throwDistinctOrderError(): Nothing = {
+    if (SQLConf.get.listaggAllowDistinctCastWithOrder) {
+      checkOrderValueDeterminism match {
+        case OrderDeterminismResult.NonDeterministicMismatch =>
+          throwFunctionAndOrderExpressionMismatchError()
+        case OrderDeterminismResult.NonDeterministicCast(inputType, castType) 
=>
+          throwFunctionAndOrderExpressionUnsafeCastError(inputType, castType)
+        case OrderDeterminismResult.Deterministic =>
+          throw SparkException.internalError(
+            "ListAgg.throwDistinctOrderError should not be called when the 
cast is safe")
+      }
+    } else {
+      throwFunctionAndOrderExpressionMismatchError()
+    }
+  }
+
+  private def throwFunctionAndOrderExpressionMismatchError() = {
+    throw QueryCompilationErrors.functionAndOrderExpressionMismatchError(
+      prettyName, child, orderExpressions)
+  }
+
+  private def throwFunctionAndOrderExpressionUnsafeCastError(
+      inputType: DataType, castType: DataType) = {
+    throw QueryCompilationErrors.functionAndOrderExpressionUnsafeCastError(
+      prettyName, inputType, castType)
+  }
+
+  /**
+   * Checks whether the order value is uniquely determined by the child value.
+   *
+   * Currently only handles the case where child = Cast(order_expr, T). If the 
cast is
+   * equality-preserving, the order value is deterministic (each child string 
maps back to
+   * exactly one original value). Otherwise, returns
+   * [[OrderDeterminismResult.NonDeterministicMismatch]].
+   *
+   * @see [[hasDistinctOrderAmbiguity]]
+   */
+  private def checkOrderValueDeterminism: OrderDeterminismResult = {
+    if (orderExpressions.size != 1) return 
OrderDeterminismResult.NonDeterministicMismatch
+    child match {
+      case Cast(castChild, castType, _, _)
+        if orderExpressions.head.child.semanticEquals(castChild) =>
+          if (isCastEqualityPreserving(castChild.dataType) &&
+              isCastTargetEqualityPreserving(castType)) {
+            OrderDeterminismResult.Deterministic
+          } else {
+            OrderDeterminismResult.NonDeterministicCast(castChild.dataType, 
castType)
+          }
+      case _ => OrderDeterminismResult.NonDeterministicMismatch
+    }
+  }
+
+  /**
+   * Returns true if casting `dt` to string/binary preserves equality 
semantics: values that
+   * are GROUP BY-equal must cast to equal results, and different results must 
imply different
+   * original values. Types like Float/Double are unsafe because IEEE 754 
negative zero (-0.0)
+   * and positive zero (0.0) are equal but produce different string 
representations.
+   *
+   * @see [[checkOrderValueDeterminism]]
+   */
+  private def isCastEqualityPreserving(dt: DataType): Boolean = dt match {
+    case _: IntegerType | LongType | ShortType | ByteType => true
+    case _: DecimalType => true
+    case _: DateType | TimestampNTZType => true
+    case _: TimeType => true
+    case _: CalendarIntervalType => true
+    case _: YearMonthIntervalType => true
+    case _: DayTimeIntervalType => true
+    case BooleanType => true
+    case BinaryType => true
+    case st: StringType => st.isUTF8BinaryCollation
+    case _: DoubleType | FloatType => false
+    // During DST fall-back, two distinct UTC epochs can format to the same 
local time string
+    // because the default format omits the timezone offset. TimestampNTZType 
is safe (uses UTC).
+    case _: TimestampType => false
+    case _ => false

Review Comment:
   how about something like array of int?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to