beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370996202


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##########
@@ -478,6 +478,7 @@ object FunctionRegistry {
     expression[Percentile]("percentile"),
     expression[Median]("median"),
     expression[Skewness]("skewness"),
+    expression[ListAgg]("listagg"),

Review Comment:
   Because `ListAgg` extends `Collect` now, please put it with `CollectList` 
together.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {
+        case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering.reverse)
+        case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering)
+        case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+      }
+      val sorted = sortFunc(buffer)
+      UTF8String.fromString(sorted.map(_.toString)
+        .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+    } else {
+      UTF8String.fromString("")
+    }
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (sameExpression) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.apply(convertToBufferElement(value),
+          convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   `InternalRow.apply(value, orderExpression.eval(input))`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {

Review Comment:
   Please put `sortFunc` out of `eval`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,

Review Comment:
   ```suggestion
       orderExpression: Expression,
       delimiter: Expression,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to