Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368586674


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I had tried with that. It seem like can't change bufferElementType in 
`CollectList` but `ListAgg` want to save two expression result. And this way 
will rewrite some method in `CollectList` like `eval`, `update`, so I didn't 
find the value of inheriting `CollectList`, it seems that it is only same with 
generic classes? Please point out my mistake. Thanks.
   
   These are code.
   <details>
   <summary>ListAgg</summary>
   
   ```scala
   case class ListAgg(
       child: Expression,
       delimiter: Expression = Literal.create(",", StringType),
       orderExpression: Expression,
       reverse: Boolean = false,
       mutableAggBufferOffset: Int = 0,
       inputAggBufferOffset: Int = 0) extends 
TypedImperativeAggregate[mutable.ArrayBuffer[Any]]
     with BinaryLike[Expression] {
   
     def this(child: Expression) =
       this(child, Literal.create(",", StringType), child, false, 0, 0)
     def this(child: Expression, delimiter: Expression) =
       this(child, delimiter, child, false, 0, 0)
   
     private lazy val collect = CollectList(child, mutableAggBufferOffset, 
inputAggBufferOffset)
   
   //  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
     override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
     
   // TODO seem like hard to change bufferElementType in collect
   //  override protected lazy val bufferElementType: DataType = {
   //    StructType(Seq(
   //      StructField("value", child.dataType),
   //      StructField("sortOrder", orderExpression.dataType)))
   //  }
   
     override def merge(buffer: ArrayBuffer[Any], input: ArrayBuffer[Any]): 
ArrayBuffer[Any] = {
       collect.merge(buffer, input)
     }
   
     override def serialize(buffer: ArrayBuffer[Any]): Array[Byte] = {
       collect.serialize(buffer)
     }
   
     override def deserialize(storageFormat: Array[Byte]): ArrayBuffer[Any] = {
       collect.deserialize(storageFormat)
     }
   
     override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
       if (buffer.nonEmpty) {
         val ordering = PhysicalDataType.ordering(orderExpression.dataType)
         val sorted = if (reverse) {
           
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
             
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
             child.dataType))
         } else {
           
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
             
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
             child.dataType))
         }
         UTF8String.fromString(sorted.map(_.toString)
           .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
       } else {
         UTF8String.fromString("")
       }
     }
   
     override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
       val value = child.eval(input)
       if (value != null) {
         buffer += InternalRow.apply(collect.convertToBufferElement(value),
           collect.convertToBufferElement(orderExpression.eval(input)))
       }
       buffer
     }
   
     override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
   
     override def withNewMutableAggBufferOffset(
         newMutableAggBufferOffset: Int) : ImperativeAggregate =
       copy(mutableAggBufferOffset = newMutableAggBufferOffset)
   
     override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
       copy(inputAggBufferOffset = newInputAggBufferOffset)
   
     override def nullable: Boolean = false
   
     override def dataType: DataType = StringType
   
     override def left: Expression = child
   
     override def right: Expression = orderExpression
   
     override protected def withNewChildrenInternal(
         newLeft: Expression,
         newRight: Expression): Expression = {
       copy(child = newLeft, orderExpression = newRight)
     }
   }
   ```
   
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to