cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1862225746


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -265,3 +272,280 @@ private[aggregate] object CollectTopK {
     case _ => throw QueryCompilationErrors.invalidNumParameter(e)
   }
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] 
[,...])] - Returns
+    the concatenation of non-null input values, separated by the delimiter 
ordered by key.
+    If all values are null, null is returned.
+    """,
+  arguments = """
+    Arguments:
+      * expr - a string or binary expression to be concatenated.
+      * delimiter - an optional string or binary foldable expression used to 
separate the input values.
+        If null, the concatenation will be performed without a delimiter. 
Default is null.
+      * key - an optional expression for ordering the input values. Multiple 
keys can be specified.
+        If none are specified, the order of the rows in the result is 
non-deterministic.
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       abc
+      > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), 
('b'), ('c') AS tab(col);
+       cba
+      > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col);
+       ab
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       aa
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       ab
+      > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a, b, c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  note = """
+    * If the order is not specified, the function is non-deterministic because
+    the order of the rows may be non-deterministic after a shuffle.
+    * If DISTINCT is specified, then expr and key must be the same expression.
+  """,
+  group = "agg_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal(null),
+    orderExpressions: Seq[SortOrder] = Nil,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends Collect[mutable.ArrayBuffer[Any]]
+  with SupportsOrderingWithinGroup
+  with ImplicitCastInputTypes {
+
+  override def orderingFilled: Boolean = orderExpressions.nonEmpty
+
+  override def isOrderingMandatory: Boolean = false
+
+  override def isDistinctSupported: Boolean = true
+
+  override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction =
+    copy(orderExpressions = orderingWithinGroup)
+
+  override protected lazy val bufferElementType: DataType = {
+    if (!needSaveOrderValue) {
+      child.dataType
+    } else {
+      StructType(
+        StructField("value", child.dataType)
+        +: orderValuesField
+      )
+    }
+  }
+  /** Indicates that the result of [[child]] is not enough for evaluation  */
+  lazy val needSaveOrderValue: Boolean = !isOrderCompatible(orderExpressions)
+
+  def this(child: Expression) =
+    this(child, Literal(null), Nil, 0, 0)
+
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, Nil, 0, 0)
+
+  override def nullable: Boolean = true
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def defaultResult: Option[Literal] = Option(Literal.create(null, 
dataType))
+
+  override def sql(isDistinct: Boolean): String = {
+    val distinct = if (isDistinct) "DISTINCT " else ""
+    val withinGroup = if (orderingFilled) {
+      s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})"
+    } else {
+      ""
+    }
+    s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup"
+  }
+
+  override def inputTypes: Seq[AbstractDataType] =
+    TypeCollection(
+      StringTypeWithCollation(supportsTrimCollation = true),
+      BinaryType
+    ) +:
+    TypeCollection(
+      StringTypeWithCollation(supportsTrimCollation = true),
+      BinaryType,
+      NullType
+    ) +:
+    orderExpressions.map(_ => AnyDataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val matchInputTypes = super.checkInputDataTypes()
+    if (matchInputTypes.isFailure) {
+      matchInputTypes
+    } else if (!delimiter.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> toSQLId("delimiter"),
+          "inputType" -> toSQLType(delimiter.dataType),
+          "inputExpr" -> toSQLExpr(delimiter)
+        )
+      )
+    } else if (delimiter.dataType == NullType) {
+      // null is the default empty delimiter so type is not important
+      TypeCheckSuccess
+    } else {
+      TypeUtils.checkForSameTypeInputExpr(child.dataType :: delimiter.dataType 
:: Nil, prettyName)
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val sortedBufferWithoutNulls = sortBuffer(buffer)
+      concatSkippingNulls(sortedBufferWithoutNulls)
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Sort buffer according orderExpressions.
+   * If orderExpressions is empty then returns buffer as is.
+   * The format of buffer is determined by [[needSaveOrderValue]]
+   * @return sorted buffer containing only child's values
+   */
+  private[this] def sortBuffer(buffer: mutable.ArrayBuffer[Any]): 
mutable.ArrayBuffer[Any] = {
+    if (!orderingFilled) {
+      // without order return as is.
+      return buffer
+    }
+    if (!needSaveOrderValue) {
+      // Here the buffer has structure [childValue0, childValue1, ...]
+      // and we want to sort it by childValues
+      val sortOrderExpression = orderExpressions.head
+      val ascendingOrdering = 
PhysicalDataType.ordering(sortOrderExpression.dataType)
+      val ordering =
+        if (sortOrderExpression.direction == Ascending) ascendingOrdering

Review Comment:
   I see, but the Spark native sorter should be more efficient and support 
spilling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to