beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368607666
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
CollectTopK =
copy(inputAggBufferOffset = newInputAggBufferOffset)
}
+
+@ExpressionDescription(
+ usage = "_FUNC_(expr) - Returns the concatenated input values," +
+ " separated by the delimiter string.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+ a,b,c
+ > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+ a,b
+ > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+ a,a
+ > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS
tab(col);
+ a,b
+ > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+ a|b
+ > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+ ""
+ """,
+ group = "agg_funcs",
+ since = "4.0.0")
+case class ListAgg(
+ child: Expression,
+ delimiter: Expression = Literal.create(",", StringType),
+ orderExpression: Expression,
+ reverse: Boolean = false,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
Review Comment:
Why need change `bufferElementType`?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
CollectTopK =
copy(inputAggBufferOffset = newInputAggBufferOffset)
}
+
+@ExpressionDescription(
+ usage = "_FUNC_(expr) - Returns the concatenated input values," +
+ " separated by the delimiter string.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+ a,b,c
+ > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+ a,b
+ > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+ a,a
+ > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS
tab(col);
+ a,b
+ > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+ a|b
+ > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+ """,
+ group = "agg_funcs",
+ since = "4.0.0")
+case class ListAgg(
+ child: Expression,
+ delimiter: Expression = Literal.create(",", StringType),
+ orderExpression: Expression,
+ reverse: Boolean = false,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+ with BinaryLike[Expression] {
+
+ def this(child: Expression) =
+ this(child, Literal.create(",", StringType), child, false, 0, 0)
+ def this(child: Expression, delimiter: Expression) =
+ this(child, delimiter, child, false, 0, 0)
+
+ override protected def convertToBufferElement(value: Any): Any =
InternalRow.copyValue(value)
+ override def defaultResult: Option[Literal] = Option(Literal.create("",
StringType))
+
+ override protected lazy val bufferElementType: DataType = {
+ StructType(Seq(
+ StructField("value", child.dataType),
+ StructField("sortOrder", orderExpression.dataType)))
+ }
+
+ override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+ if (buffer.nonEmpty) {
+ val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+ val sorted = if (reverse) {
+
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
Review Comment:
I think you just save the temp data into the `CollectList`'s buffer.
Then you sort the `CollectList`'s buffer here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]