Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2024-03-07 Thread via GitHub


github-actions[bot] closed pull request #42398: [SPARK-42746][SQL] Add the 
LISTAGG() aggregate function
URL: https://github.com/apache/spark/pull/42398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2024-03-06 Thread via GitHub


github-actions[bot] commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1982086020

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-11-26 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1405586741


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##
@@ -2214,6 +2214,30 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+val column = expression(ctx.aggEpxr)
+val sortOrder = visitSortItem(ctx.sortItem)
+val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+if (!column.semanticEquals(sortOrder.child) && isDistinct) {
+  throw 
QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", 
column,
+sortOrder.child)
+}
+val delimiter = if (ctx.delimiter != null) Literal(ctx.delimiter.getText) 
else Literal(",")

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-11-25 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1405287835


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -158,6 +158,69 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 }
   }
 
+  test("SPARK-42746: listagg function") {
+withTempView("df", "df2") {
+  Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, 
null)).toDF("a", "b")
+.createOrReplaceTempView("df")
+  checkAnswer(
+sql("select listagg(b) from df group by a"),
+Row("") :: Row("b,c") :: Row("c,d") :: Nil)
+
+  checkAnswer(
+sql("select listagg(b) from df where 1 != 1"),
+Row("") :: Nil)
+
+  checkAnswer(
+sql("select listagg(b, '|') from df group by a"),
+Row("b|c") :: Row("c|d") :: Row("") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+Row("a,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+Row("b,b,a,a") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+  "OVER (PARTITION BY b) FROM df"),
+Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row("") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b DESC) FROM df"),

Review Comment:
   Thanks @hopefulnick , sorry for late response, let me fix it today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-11-09 Thread via GitHub


hopefulnick commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1387915950


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##
@@ -2214,6 +2214,30 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+val column = expression(ctx.aggEpxr)
+val sortOrder = visitSortItem(ctx.sortItem)
+val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+if (!column.semanticEquals(sortOrder.child) && isDistinct) {
+  throw 
QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", 
column,
+sortOrder.child)
+}
+val delimiter = if (ctx.delimiter != null) Literal(ctx.delimiter.getText) 
else Literal(",")

Review Comment:
   ctx.delimiter.getText includes quotes, like "','"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-11-08 Thread via GitHub


hopefulnick commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1387425067


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -158,6 +158,69 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 }
   }
 
+  test("SPARK-42746: listagg function") {
+withTempView("df", "df2") {
+  Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, 
null)).toDF("a", "b")
+.createOrReplaceTempView("df")
+  checkAnswer(
+sql("select listagg(b) from df group by a"),
+Row("") :: Row("b,c") :: Row("c,d") :: Nil)
+
+  checkAnswer(
+sql("select listagg(b) from df where 1 != 1"),
+Row("") :: Nil)
+
+  checkAnswer(
+sql("select listagg(b, '|') from df group by a"),
+Row("b|c") :: Row("c|d") :: Row("") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+Row("a,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+Row("b,b,a,a") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+  "OVER (PARTITION BY b) FROM df"),
+Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row("") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b DESC) FROM df"),

Review Comment:
   when specifying the custom seperator, like ',', it will get 
"b','a','b,','a", not the expected result "b,a,b,a"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-31 Thread via GitHub


Hisoka-X commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1788269081

   kindly ping @cloud-fan @MaxGekk @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1778406651

   cc @cloud-fan @MaxGekk I'm okay with most of this PR. Please help me to 
review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1371068875


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression,
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  lazy val sortFunc = (sameExpression, reverse) match {
+case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.sorted(ordering.reverse)
+case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.sorted(ordering)
+case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+  }
+  val sorted = sortFunc(buffer)
+  UTF8String.fromString(sorted.map(_.toString)
+.mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+} else {
+  UTF8String.fromString("")
+}
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+val value = child.eval(input)
+if (value != null) {
+  val v = if (sameExpression) {
+convertToBufferElement(value)
+  } else {
+InternalRow.apply(convertToBufferElement(value),
+  convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   cc @cloud-fan @MaxGekk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1371026410


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression,
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  lazy val sortFunc = (sameExpression, reverse) match {
+case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.sorted(ordering.reverse)
+case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.sorted(ordering)
+case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+  }
+  val sorted = sortFunc(buffer)
+  UTF8String.fromString(sorted.map(_.toString)
+.mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+} else {
+  UTF8String.fromString("")
+}
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+val value = child.eval(input)
+if (value != null) {
+  val v = if (sameExpression) {
+convertToBufferElement(value)
+  } else {
+InternalRow.apply(convertToBufferElement(value),
+  convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   This one can not be changed, we must save value into buffer after execute 
`InternalRow.copyValue`. Please refer 
https://github.com/apache/spark/pull/42398#discussion_r1349839317 . Without 
this, the value in buffer alway have same value of last one row.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = 

Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370996202


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##
@@ -478,6 +478,7 @@ object FunctionRegistry {
 expression[Percentile]("percentile"),
 expression[Median]("median"),
 expression[Skewness]("skewness"),
+expression[ListAgg]("listagg"),

Review Comment:
   Because `ListAgg` extends `Collect` now, please put it with `CollectList` 
together.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression,
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  lazy val sortFunc = (sameExpression, reverse) match {
+case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.sorted(ordering.reverse)
+case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.sorted(ordering)
+case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+  buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+  }
+  val sorted = sortFunc(buffer)
+  UTF8String.fromString(sorted.map(_.toString)
+.mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+} else {
+  UTF8String.fromString("")
+}
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+val value = child.eval(input)
+if (value != null) {
+  val v = if (sameExpression) {
+convertToBufferElement(value)
+  } else {
+InternalRow.apply(convertToBufferElement(value),
+  convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   `InternalRow.apply(value, orderExpression.eval(input))`



##

Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue
  * We have to store all the collected elements in memory, and so notice that 
too many elements
  * can cause GC paused and eventually OutOfMemory Errors.
  */
-abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends 
TypedImperativeAggregate[T]
-  with UnaryLike[Expression] {
+abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends 
TypedImperativeAggregate[T] {

Review Comment:
   Change `Collect` is looks good now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370071343


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression,
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  val sorted = if (sameExpression) {
+if (reverse) {
+  buffer.toSeq.sorted(ordering.reverse)
+} else {
+  buffer.toSeq.sorted(ordering)
+}
+  } else {
+if (reverse) {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+} else {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+}
+  }
+  UTF8String.fromString(sorted.map(_.toString)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370056246


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression,
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int): ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  val sorted = if (sameExpression) {
+if (reverse) {
+  buffer.toSeq.sorted(ordering.reverse)
+} else {
+  buffer.toSeq.sorted(ordering)
+}
+  } else {
+if (reverse) {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+} else {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+}
+  }
+  UTF8String.fromString(sorted.map(_.toString)

Review Comment:
   You can define a sort function like
   
   ```
   private lazy val sortFunc = (sameExpression, reverse) match {
 case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => 
buffer.toSeq.sorted(ordering.reverse)
 ...
   }
   ```
   
   and call the function here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370049863


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
 )
   }
 
+  test("listagg function") {
+// normal case
+val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df.selectExpr("listagg(a)", "listagg(b)"),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+checkAnswer(
+  df.select(listagg($"a"), listagg($"b")),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+
+// distinct case
+val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+checkAnswer(
+  df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+  Seq(Row("a,b", "b,d"))
+)
+
+// null case
+val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+checkAnswer(
+  df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+listagg($"c")),
+  Seq(Row("a", "a,a", "b", "b,b", ""))
+)
+
+// custom delimiter
+val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+  Seq(Row("a|b|c", "b|c|d"))
+)
+  }

Review Comment:
   also add new one 
https://github.com/apache/spark/pull/42398/commits/87999f4c98799c5365efcb0cf0e83fd8f0aa3dbc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370043175


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
 )
   }
 
+  test("listagg function") {
+// normal case
+val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df.selectExpr("listagg(a)", "listagg(b)"),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+checkAnswer(
+  df.select(listagg($"a"), listagg($"b")),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+
+// distinct case
+val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+checkAnswer(
+  df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+  Seq(Row("a,b", "b,d"))
+)
+
+// null case
+val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+checkAnswer(
+  df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+listagg($"c")),
+  Seq(Row("a", "a,a", "b", "b,b", ""))
+)
+
+// custom delimiter
+val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+  Seq(Row("a|b|c", "b|c|d"))
+)
+  }

Review Comment:
   here 
https://github.com/apache/spark/pull/42398/files#diff-7de447aae84ff6752e962b088c3f62e552db3b58a4be52b69ea65da82cba6c27R198-R200



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370041468


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
 )
   }
 
+  test("listagg function") {
+// normal case
+val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df.selectExpr("listagg(a)", "listagg(b)"),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+checkAnswer(
+  df.select(listagg($"a"), listagg($"b")),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+
+// distinct case
+val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+checkAnswer(
+  df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+  Seq(Row("a,b", "b,d"))
+)
+
+// null case
+val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+checkAnswer(
+  df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+listagg($"c")),
+  Seq(Row("a", "a,a", "b", "b,b", ""))
+)
+
+// custom delimiter
+val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+  Seq(Row("a|b|c", "b|c|d"))
+)
+  }

Review Comment:
   Is there have the test case with different input column and sort column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370041468


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
 )
   }
 
+  test("listagg function") {
+// normal case
+val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df.selectExpr("listagg(a)", "listagg(b)"),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+checkAnswer(
+  df.select(listagg($"a"), listagg($"b")),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+
+// distinct case
+val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+checkAnswer(
+  df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+  Seq(Row("a,b", "b,d"))
+)
+
+// null case
+val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+checkAnswer(
+  df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+listagg($"c")),
+  Seq(Row("a", "a,a", "b", "b,b", ""))
+)
+
+// custom delimiter
+val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+  Seq(Row("a|b|c", "b|c|d"))
+)
+  }

Review Comment:
   Is there have the test case with different input column and sort column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369980996


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),

Review Comment:
   addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369980594


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##
@@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 "fieldNames" -> v1Table.schema.fieldNames.mkString(", ")))
   }
 
+  def functionAndOrderExpressionMismatchError(
+  functionName: String,
+  functionExpr: Expression,
+  orderExpr: Expression): Throwable = {
+new AnalysisException(
+  errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH",
+  messageParameters = Map(
+"functionName" -> toSQLStmt(functionName),

Review Comment:
   addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369978917


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
 )
   }
 
+  test("listagg function") {
+// normal case
+val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df.selectExpr("listagg(a)", "listagg(b)"),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+checkAnswer(
+  df.select(listagg($"a"), listagg($"b")),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+
+// distinct case
+val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+checkAnswer(
+  df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+  Seq(Row("a,b", "b,d"))
+)
+
+// null case
+val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+checkAnswer(
+  df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+listagg($"c")),
+  Seq(Row("a", "a,a", "b", "b,b", ""))
+)
+
+// custom delimiter
+val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+  Seq(Row("a|b|c", "b|c|d"))
+)
+  }

Review Comment:
   Can this prove?
   
https://github.com/apache/spark/pull/42398/files#diff-7de447aae84ff6752e962b088c3f62e552db3b58a4be52b69ea65da82cba6c27R169-R171



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369943722


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##
@@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 "fieldNames" -> v1Table.schema.fieldNames.mkString(", ")))
   }
 
+  def functionAndOrderExpressionMismatchError(
+  functionName: String,
+  functionExpr: Expression,
+  orderExpr: Expression): Throwable = {
+new AnalysisException(
+  errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH",
+  messageParameters = Map(
+"functionName" -> toSQLStmt(functionName),

Review Comment:
   ```suggestion
   "functionName" -> toSQLId(functionName),
   ```



##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
 )
   }
 
+  test("listagg function") {
+// normal case
+val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df.selectExpr("listagg(a)", "listagg(b)"),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+checkAnswer(
+  df.select(listagg($"a"), listagg($"b")),
+  Seq(Row("a,b,c", "b,c,d"))
+)
+
+// distinct case
+val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+checkAnswer(
+  df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+  Seq(Row("a,b", "b,d"))
+)
+
+// null case
+val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+checkAnswer(
+  df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+listagg($"c")),
+  Seq(Row("a", "a,a", "b", "b,b", ""))
+)
+
+// custom delimiter
+val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+checkAnswer(
+  df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+  Seq(Row("a|b|c", "b|c|d"))
+)
+  }

Review Comment:
   Please test the empty input.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),

Review Comment:
   Do we really need the default value?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 

Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369934056


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  val sorted = if (sameExpression) {
+if (reverse) {
+  buffer.toSeq.sorted(ordering.reverse)
+} else {
+  buffer.toSeq.sorted(ordering)
+}
+  } else {
+if (reverse) {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+} else {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+}
+  }
+  UTF8String.fromString(sorted.map(_.toString)
+.mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+} else {
+  UTF8String.fromString("")
+}
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+val value = child.eval(input)
+if (value != null) {
+  val v = if (sameExpression) {
+convertToBufferElement(value)
+  } else {
+InternalRow.apply(convertToBufferElement(value),
+  convertToBufferElement(orderExpression.eval(input)))
+  }
+  buffer += v
+}
+buffer
+  }
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int) : ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression

Review Comment:
   addressed all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369925570


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##
@@ -2214,6 +2214,40 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+val column = expression(ctx.aggEpxr)
+val sortOrder = visitSortItem(ctx.sortItem)
+val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+if (!column.semanticEquals(sortOrder.child) && isDistinct) {
+  throw 
QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", 
column,
+sortOrder.child)
+}
+val listAgg = if (ctx.delimiter != null) {
+  sortOrder.direction match {
+case Ascending => ListAgg(column, Literal(ctx.delimiter.getText), 
sortOrder.child,
+  false)
+case Descending => ListAgg(column, Literal(ctx.delimiter.getText), 
sortOrder.child,
+  true)
+  }
+} else {
+  sortOrder.direction match {
+case Ascending => ListAgg(column, Literal(","), sortOrder.child, false)
+case Descending => ListAgg(column, Literal(","), sortOrder.child, true)
+  }
+}

Review Comment:
   ```
   val delimiter = ...
   val reverse = ...
   val listAgg = ListAgg(column, delimiter, sortOrder.child, reverse)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369921181


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+if (sameExpression) {
+  child.dataType
+} else {
+  StructType(Seq(
+StructField("value", child.dataType),
+StructField("sortOrder", orderExpression.dataType)))
+}
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  val sorted = if (sameExpression) {
+if (reverse) {
+  buffer.toSeq.sorted(ordering.reverse)
+} else {
+  buffer.toSeq.sorted(ordering)
+}
+  } else {
+if (reverse) {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+child.dataType))
+} else {
+  
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+child.dataType))
+}
+  }
+  UTF8String.fromString(sorted.map(_.toString)
+.mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+} else {
+  UTF8String.fromString("")
+}
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+val value = child.eval(input)
+if (value != null) {
+  val v = if (sameExpression) {
+convertToBufferElement(value)
+  } else {
+InternalRow.apply(convertToBufferElement(value),
+  convertToBufferElement(orderExpression.eval(input)))
+  }
+  buffer += v
+}
+buffer
+  }
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+  newMutableAggBufferOffset: Int) : ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression

Review Comment:
   Please put these functions forward: `createAggregationBuffer`, `nullable`, 
`dataType`, `left` and `right`.
   You can reference other function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369844354


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   > ListAgg need save two fields.
   
   Yes, thanks for your reply patiently. @beliefer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue
  * We have to store all the collected elements in memory, and so notice that 
too many elements
  * can cause GC paused and eventually OutOfMemory Errors.
  */
-abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends 
TypedImperativeAggregate[T]
-  with UnaryLike[Expression] {
+abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends 
TypedImperativeAggregate[T] {

Review Comment:
   Change Collect is looks good now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-24 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369774164


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I'm sorry. I get it now. `ListAgg` need save two fields. If so, please go on 
the path and push it forward.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369609547


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   > You can try to not define bufferElementType.
   
   These are some problems I should to solve.
   1. If I use `CollectList` in `ListAgg`, I should redefine 
`bufferElementType` in `CollectList` so that we can save two fields value each 
`InternalRow`. (Seem like can not do that at now)
   2. If not define `bufferElementType`, seem like I can not use `CollectList` 
in `ListAgg`. I shouldn't `extend Collect`, so that I can do it without 
`BinaryLike` and `UnaryLike` problem.
   3. The plan for now, I removed `UnaryLike` of  `Collect` so that I can use 
`BinaryLike` and `extend Collect` in `ListAgg` at the same time.
   
   So which way should I continue?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369579705


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I see. You can try to not define `bufferElementType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369493701


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Thanks @beliefer . But seem like it doesn't solve the problem of save 
another order field value. PS: you can check the code I commented, almost like 
you shared. https://github.com/apache/spark/pull/42398#discussion_r1368586674



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369484714


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   ```
   ListAgg (...) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] 
with BinaryLike[Expression] {
 private val collectList = CollectList()
 ...
 override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
   collectList.update(...)
 }
   
 override def merge(buffer: ArrayBuffer[Any],, other: Any): Any = {
   collectList.merge(...)
 }
   
 override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
   val ordering = PhysicalDataType.ordering(orderExpr.dataType)
   val sorted = // sort logic here
  
   collectList.eval(sorted)
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369484714


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   ```
   ListAgg (...) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] 
with BinaryLike[Expression] {
 private val collectList = CollectList()
 ...
 override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
   collectList.update(...)
 }
   
 override def merge(buffer: ArrayBuffer[Any],, other: Any): Any = {
   collectList.merge(...)
 }
   
 override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
   val ordering = PhysicalDataType.ordering(orderExpr.dataType)
   val sorted = // sort logic here
  
   new GenericArrayData(sorted.toArray)
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369459076


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   It's a bit difficult for me to understand. The values of the two expressions 
should be bound before sorting. I don't quite understand the meaning of 
separate definitions and how to collect the corresponding values separately. 
Unless two `CollectList` are defined in `ListAgg`. Could you provide some demo? 
Please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369452950


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   You can declare another type field in `ListAgg`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368615704


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Because the order by column can be different with agg column. So we should 
save it both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368607666


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Why need change `bufferElementType`?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, child, false, 0, 0)
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+StructType(Seq(
+  StructField("value", child.dataType),
+  StructField("sortOrder", orderExpression.dataType)))
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+if (buffer.nonEmpty) {
+  val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+  val sorted = if (reverse) {
+
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,

Review Comment:
   I think you just save the temp data into the `CollectList`'s buffer.
   Then you sort the `CollectList`'s buffer here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368586674


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I had tried with that. It seem like can't change bufferElementType in 
`CollectList` but `ListAgg` want to save two expression result. And this way 
will rewrite some method in `CollectList` like `eval`, `update`, so I didn't 
find the value of inheriting `CollectList`, it seems that it is only same with 
generic classes? Please point out my mistake. Thanks.
   
   These are code.
   
   ListAgg
   
   ```scala
   case class ListAgg(
   child: Expression,
   delimiter: Expression = Literal.create(",", StringType),
   orderExpression: Expression,
   reverse: Boolean = false,
   mutableAggBufferOffset: Int = 0,
   inputAggBufferOffset: Int = 0) extends 
TypedImperativeAggregate[mutable.ArrayBuffer[Any]]
 with BinaryLike[Expression] {
   
 def this(child: Expression) =
   this(child, Literal.create(",", StringType), child, false, 0, 0)
 def this(child: Expression, delimiter: Expression) =
   this(child, delimiter, child, false, 0, 0)
   
 private lazy val collect = CollectList(child, mutableAggBufferOffset, 
inputAggBufferOffset)
   
   //  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
 override def defaultResult: Option[Literal] = Option(Literal.create("", 
StringType))
 
   // TODO seem like hard to change bufferElementType in collect
   //  override protected lazy val bufferElementType: DataType = {
   //StructType(Seq(
   //  StructField("value", child.dataType),
   //  StructField("sortOrder", orderExpression.dataType)))
   //  }
   
 override def merge(buffer: ArrayBuffer[Any], input: ArrayBuffer[Any]): 
ArrayBuffer[Any] = {
   collect.merge(buffer, input)
 }
   
 override def serialize(buffer: ArrayBuffer[Any]): Array[Byte] = {
   collect.serialize(buffer)
 }
   
 override def deserialize(storageFormat: Array[Byte]): ArrayBuffer[Any] = {
   collect.deserialize(storageFormat)
 }
   
 override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
   if (buffer.nonEmpty) {
 val ordering = PhysicalDataType.ordering(orderExpression.dataType)
 val sorted = if (reverse) {
   
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
 
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
 child.dataType))
 } else {
   
buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
 
orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
 child.dataType))
 }
 UTF8String.fromString(sorted.map(_.toString)
   .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
   } else {
 UTF8String.fromString("")
   }
 }
   
 override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
   val value = child.eval(input)
   if (value != null) {
 buffer += InternalRow.apply(collect.convertToBufferElement(value),
   collect.convertToBufferElement(orderExpression.eval(input)))
   }
   buffer
 }
   
 override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
   
 override def withNewMutableAggBufferOffset(
 newMutableAggBufferOffset: Int) : ImperativeAggregate =
   copy(mutableAggBufferOffset = newMutableAggBufferOffset)
   
 override def 

Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-23 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368419550


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -988,6 +988,8 @@ primaryExpression
 | name=(PERCENTILE_CONT | PERCENTILE_DISC) LEFT_PAREN 
percentage=valueExpression RIGHT_PAREN
 WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN
 (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? ( OVER 
windowSpec)? #percentile
+| LISTAGG LEFT_PAREN setQuantifier? aggEpxr=expression (COMMA 
delimiter=stringLit)? RIGHT_PAREN
+WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (OVER 
windowSpec)?  #listAgg

Review Comment:
   ```suggestion
   WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (OVER 
windowSpec)?   #listAgg
   ```



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Proxy mode can also be used.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
CollectTopK =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+orderExpression: Expression,
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I think we should avoid change `Collect`. We can declare a field of 
`CollectList` and `ListAgg` extended `BinaryLike[Expression]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-22 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368090659


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   Thanks @beliefer , I did some updated. Please check again. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-13 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1359157402


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   You can composite them. Please refer `PercentileCont` or `RegrSlope`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-11 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1355031438


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   Hi @holdenk @beliefer , I did some update for `extends Collect`. But there 
are some problems that bother me. ListAgg needs to save two expressions 
separately because the expressions used for aggregation and sorting may be 
different. However, Collect is an implementation of `UnaryLike` that can only 
have one child, resulting in another expression that cannot be parsed. Any 
suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-09 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1351167115


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   I think this issue has been avoided if we refactor with `CollectList` or 
`extends Collect`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-09 Thread via GitHub


holdenk commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1351071287


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   So the idea with that is to bubble up the distinct to avoid a shuffle?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-09 Thread via GitHub


holdenk commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1351070023


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   Similarily lets put this in collect.scala with the other collect friends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349881268


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   `ListAgg` stores value too. I think reuse `CollectList` or `extends Collect` 
is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349839317


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],
+  input: InternalRow): OpenHashMap[AnyRef, Long] = {
+val value = child.eval(input)
+if (value != null) {
+  val key = InternalRow.copyValue(value)

Review Comment:
   It could be `UTF8String`, without this, the Map key will be changed after 
put key into map. Other place also use this way like `CollectList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349837194


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   So use List if without `DISTINCT` and use Set if `DISTINCT` specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349836810


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],
+  input: InternalRow): OpenHashMap[AnyRef, Long] = {
+val value = child.eval(input)
+if (value != null) {
+  val key = InternalRow.copyValue(value)

Review Comment:
   The key cached here and not related to `InternalRow`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349836243


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   It would be worked. But seem like `CollectList` will use more memory because 
it store value not the number of times? Should I do that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349833814


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   Set or List can not store duplicate value. It to make sure invoke it work 
right without `DISTINCT` expression. Eg `a` and `a` should be `a,a` not `a`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349833053


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],
+  input: InternalRow): OpenHashMap[AnyRef, Long] = {
+val value = child.eval(input)
+if (value != null) {
+  val key = InternalRow.copyValue(value)

Review Comment:
   The value of InternalRow will be rewrite. So we must copy it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349832245


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)
+
+  override def update(
+  buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   It seems we only need set or list here.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349829226


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 }
   }
 
+  test("SPARK-42746: listagg function") {
+withTempView("df", "df2") {
+  Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, 
null)).toDF("a", "b")
+.createOrReplaceTempView("df")
+  checkAnswer(
+sql("select listagg(b) from df group by a"),
+Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("select listagg(b, '|') from df group by a"),
+Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+Row("a,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+Row("b,b,a,a") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+  "OVER (PARTITION BY b) FROM df"),
+Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+  checkError(
+exception = intercept[AnalysisException] {
+  sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   oh, this is a mistake, let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349828939


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   a,a
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   After a second consider, can we reuse the `CollectList` that is an existent 
aggregate function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349828105


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 }
   }
 
+  test("SPARK-42746: listagg function") {
+withTempView("df", "df2") {
+  Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, 
null)).toDF("a", "b")
+.createOrReplaceTempView("df")
+  checkAnswer(
+sql("select listagg(b) from df group by a"),
+Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("select listagg(b, '|') from df group by a"),
+Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+Row("a,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+Row("b,b,a,a") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+  "OVER (PARTITION BY b) FROM df"),
+Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+  checkError(
+exception = intercept[AnalysisException] {
+  sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   But this test case without `DISTINCT` keyword.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349825602


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349822359


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 }
   }
 
+  test("SPARK-42746: listagg function") {
+withTempView("df", "df2") {
+  Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, 
null)).toDF("a", "b")
+.createOrReplaceTempView("df")
+  checkAnswer(
+sql("select listagg(b) from df group by a"),
+Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("select listagg(b, '|') from df group by a"),
+Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+Row("a,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+Row("b,b,a,a") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+  "OVER (PARTITION BY b) FROM df"),
+Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+  checkError(
+exception = intercept[AnalysisException] {
+  sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   Yes, so I add `FUNCTION_AND_ORDER_EXPRESSION_MISMATCH` error for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1752261690

   > @Hisoka-X Is `LISTAGG` an ANSI standard?
   
   Yes, refer https://modern-sql.com/feature/listagg


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349818074


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);

Review Comment:
   Could you add an example with duplicate values? With DISTINCT or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349817389


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 }
   }
 
+  test("SPARK-42746: listagg function") {
+withTempView("df", "df2") {
+  Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, 
null)).toDF("a", "b")
+.createOrReplaceTempView("df")
+  checkAnswer(
+sql("select listagg(b) from df group by a"),
+Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("select listagg(b, '|') from df group by a"),
+Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+Row("a,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+Row("a,a,b,b") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+Row("b,b,a,a") :: Nil)
+
+  checkAnswer(
+sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+  "OVER (PARTITION BY b) FROM df"),
+Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+  checkError(
+exception = intercept[AnalysisException] {
+  sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   Is the behavior correct? According to 
https://docs.snowflake.com/en/sql-reference/functions/listagg, the mismatch 
issues if you specify different columns for `DISTINCT` and `WITHIN GROUP`.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);

Review Comment:
   Could you add an example with duplicate values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


beliefer commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1752242004

   @Hisoka-X Is `LISTAGG` an ANSI standard?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349810729


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)

Review Comment:
   It used for sql expression, it will be invoked when call from sql like 
`listagg(b)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2023-10-08 Thread via GitHub


khalidmammadov commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349758338


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+" separated by the delimiter string.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a,b,c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+   a,b
+  > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+   a|b
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal.create(",", StringType),
+reverse: Boolean = false,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), 
false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, 
false, 0, 0)

Review Comment:
   What is the value for these 2 auxilary constractors when default class 
parameters are also supplied for the same default values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org