Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
github-actions[bot] closed pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function URL: https://github.com/apache/spark/pull/42398 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
github-actions[bot] commented on PR #42398: URL: https://github.com/apache/spark/pull/42398#issuecomment-1982086020 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1405586741 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ## @@ -2214,6 +2214,30 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { } } + /** + * Create a ListAgg expression. + */ + override def visitListAgg(ctx: ListAggContext): AnyRef = { +val column = expression(ctx.aggEpxr) +val sortOrder = visitSortItem(ctx.sortItem) +val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null) +if (!column.semanticEquals(sortOrder.child) && isDistinct) { + throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", column, +sortOrder.child) +} +val delimiter = if (ctx.delimiter != null) Literal(ctx.delimiter.getText) else Literal(",") Review Comment: fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1405287835 ## sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala: ## @@ -158,6 +158,69 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-42746: listagg function") { +withTempView("df", "df2") { + Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b") +.createOrReplaceTempView("df") + checkAnswer( +sql("select listagg(b) from df group by a"), +Row("") :: Row("b,c") :: Row("c,d") :: Nil) + + checkAnswer( +sql("select listagg(b) from df where 1 != 1"), +Row("") :: Nil) + + checkAnswer( +sql("select listagg(b, '|') from df group by a"), +Row("b|c") :: Row("c|d") :: Row("") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(DISTINCT a) FROM df"), +Row("a,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"), +Row("b,b,a,a") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " + + "OVER (PARTITION BY b) FROM df"), +Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row("") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b DESC) FROM df"), Review Comment: Thanks @hopefulnick , sorry for late response, let me fix it today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
hopefulnick commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1387915950 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ## @@ -2214,6 +2214,30 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { } } + /** + * Create a ListAgg expression. + */ + override def visitListAgg(ctx: ListAggContext): AnyRef = { +val column = expression(ctx.aggEpxr) +val sortOrder = visitSortItem(ctx.sortItem) +val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null) +if (!column.semanticEquals(sortOrder.child) && isDistinct) { + throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", column, +sortOrder.child) +} +val delimiter = if (ctx.delimiter != null) Literal(ctx.delimiter.getText) else Literal(",") Review Comment: ctx.delimiter.getText includes quotes, like "','" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
hopefulnick commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1387425067 ## sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala: ## @@ -158,6 +158,69 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-42746: listagg function") { +withTempView("df", "df2") { + Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b") +.createOrReplaceTempView("df") + checkAnswer( +sql("select listagg(b) from df group by a"), +Row("") :: Row("b,c") :: Row("c,d") :: Nil) + + checkAnswer( +sql("select listagg(b) from df where 1 != 1"), +Row("") :: Nil) + + checkAnswer( +sql("select listagg(b, '|') from df group by a"), +Row("b|c") :: Row("c|d") :: Row("") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(DISTINCT a) FROM df"), +Row("a,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"), +Row("b,b,a,a") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " + + "OVER (PARTITION BY b) FROM df"), +Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row("") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b DESC) FROM df"), Review Comment: when specifying the custom seperator, like ',', it will get "b','a','b,','a", not the expected result "b,a,b,a" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on PR #42398: URL: https://github.com/apache/spark/pull/42398#issuecomment-1788269081 kindly ping @cloud-fan @MaxGekk @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on PR #42398: URL: https://github.com/apache/spark/pull/42398#issuecomment-1778406651 cc @cloud-fan @MaxGekk I'm okay with most of this PR. Please help me to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1371068875 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + lazy val sortFunc = (sameExpression, reverse) match { +case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering.reverse) +case (true, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering) +case (false, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +case (false, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) + } + val sorted = sortFunc(buffer) + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) Review Comment: cc @cloud-fan @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1371026410 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + lazy val sortFunc = (sameExpression, reverse) match { +case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering.reverse) +case (true, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering) +case (false, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +case (false, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) + } + val sorted = sortFunc(buffer) + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) Review Comment: This one can not be changed, we must save value into buffer after execute `InternalRow.copyValue`. Please refer https://github.com/apache/spark/pull/42398#discussion_r1349839317 . Without this, the value in buffer alway have same value of last one row. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset =
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370996202 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala: ## @@ -478,6 +478,7 @@ object FunctionRegistry { expression[Percentile]("percentile"), expression[Median]("median"), expression[Skewness]("skewness"), +expression[ListAgg]("listagg"), Review Comment: Because `ListAgg` extends `Collect` now, please put it with `CollectList` together. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + lazy val sortFunc = (sameExpression, reverse) match { +case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering.reverse) +case (true, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering) +case (false, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +case (false, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) + } + val sorted = sortFunc(buffer) + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) Review Comment: `InternalRow.apply(value, orderExpression.eval(input))` ##
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue * We have to store all the collected elements in memory, and so notice that too many elements * can cause GC paused and eventually OutOfMemory Errors. */ -abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] - with UnaryLike[Expression] { +abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] { Review Comment: Change `Collect` is looks good now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370071343 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + val sorted = if (sameExpression) { +if (reverse) { + buffer.toSeq.sorted(ordering.reverse) +} else { + buffer.toSeq.sorted(ordering) +} + } else { +if (reverse) { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +} else { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) +} + } + UTF8String.fromString(sorted.map(_.toString) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370056246 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + val sorted = if (sameExpression) { +if (reverse) { + buffer.toSeq.sorted(ordering.reverse) +} else { + buffer.toSeq.sorted(ordering) +} + } else { +if (reverse) { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +} else { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) +} + } + UTF8String.fromString(sorted.map(_.toString) Review Comment: You can define a sort function like ``` private lazy val sortFunc = (sameExpression, reverse) match { case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => buffer.toSeq.sorted(ordering.reverse) ... } ``` and call the function here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370049863 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { +// normal case +val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("a,b,c", "b,c,d")) +) +checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("a,b,c", "b,c,d")) +) + +// distinct case +val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") +checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("a,b", "b,d")) +) + +// null case +val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") +checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), +listagg($"c")), + Seq(Row("a", "a,a", "b", "b,b", "")) +) + +// custom delimiter +val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) +) + } Review Comment: also add new one https://github.com/apache/spark/pull/42398/commits/87999f4c98799c5365efcb0cf0e83fd8f0aa3dbc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370043175 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { +// normal case +val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("a,b,c", "b,c,d")) +) +checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("a,b,c", "b,c,d")) +) + +// distinct case +val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") +checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("a,b", "b,d")) +) + +// null case +val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") +checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), +listagg($"c")), + Seq(Row("a", "a,a", "b", "b,b", "")) +) + +// custom delimiter +val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) +) + } Review Comment: here https://github.com/apache/spark/pull/42398/files#diff-7de447aae84ff6752e962b088c3f62e552db3b58a4be52b69ea65da82cba6c27R198-R200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370041468 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { +// normal case +val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("a,b,c", "b,c,d")) +) +checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("a,b,c", "b,c,d")) +) + +// distinct case +val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") +checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("a,b", "b,d")) +) + +// null case +val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") +checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), +listagg($"c")), + Seq(Row("a", "a,a", "b", "b,b", "")) +) + +// custom delimiter +val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) +) + } Review Comment: Is there have the test case with different input column and sort column? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370041468 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { +// normal case +val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("a,b,c", "b,c,d")) +) +checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("a,b,c", "b,c,d")) +) + +// distinct case +val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") +checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("a,b", "b,d")) +) + +// null case +val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") +checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), +listagg($"c")), + Seq(Row("a", "a,a", "b", "b,b", "")) +) + +// custom delimiter +val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) +) + } Review Comment: Is there have the test case with different input column and sort column? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369980996 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), Review Comment: addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369980594 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala: ## @@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "fieldNames" -> v1Table.schema.fieldNames.mkString(", "))) } + def functionAndOrderExpressionMismatchError( + functionName: String, + functionExpr: Expression, + orderExpr: Expression): Throwable = { +new AnalysisException( + errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH", + messageParameters = Map( +"functionName" -> toSQLStmt(functionName), Review Comment: addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369978917 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { +// normal case +val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("a,b,c", "b,c,d")) +) +checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("a,b,c", "b,c,d")) +) + +// distinct case +val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") +checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("a,b", "b,d")) +) + +// null case +val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") +checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), +listagg($"c")), + Seq(Row("a", "a,a", "b", "b,b", "")) +) + +// custom delimiter +val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) +) + } Review Comment: Can this prove? https://github.com/apache/spark/pull/42398/files#diff-7de447aae84ff6752e962b088c3f62e552db3b58a4be52b69ea65da82cba6c27R169-R171 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369943722 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala: ## @@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "fieldNames" -> v1Table.schema.fieldNames.mkString(", "))) } + def functionAndOrderExpressionMismatchError( + functionName: String, + functionExpr: Expression, + orderExpr: Expression): Throwable = { +new AnalysisException( + errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH", + messageParameters = Map( +"functionName" -> toSQLStmt(functionName), Review Comment: ```suggestion "functionName" -> toSQLId(functionName), ``` ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { +// normal case +val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("a,b,c", "b,c,d")) +) +checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("a,b,c", "b,c,d")) +) + +// distinct case +val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") +checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("a,b", "b,d")) +) + +// null case +val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") +checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), +listagg($"c")), + Seq(Row("a", "a,a", "b", "b,b", "")) +) + +// custom delimiter +val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") +checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) +) + } Review Comment: Please test the empty input. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), Review Comment: Do we really need the default value? ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] =
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369934056 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + val sorted = if (sameExpression) { +if (reverse) { + buffer.toSeq.sorted(ordering.reverse) +} else { + buffer.toSeq.sorted(ordering) +} + } else { +if (reverse) { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +} else { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) +} + } + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) + } + buffer += v +} +buffer + } + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int) : ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression Review Comment: addressed all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369925570 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ## @@ -2214,6 +2214,40 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { } } + /** + * Create a ListAgg expression. + */ + override def visitListAgg(ctx: ListAggContext): AnyRef = { +val column = expression(ctx.aggEpxr) +val sortOrder = visitSortItem(ctx.sortItem) +val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null) +if (!column.semanticEquals(sortOrder.child) && isDistinct) { + throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", column, +sortOrder.child) +} +val listAgg = if (ctx.delimiter != null) { + sortOrder.direction match { +case Ascending => ListAgg(column, Literal(ctx.delimiter.getText), sortOrder.child, + false) +case Descending => ListAgg(column, Literal(ctx.delimiter.getText), sortOrder.child, + true) + } +} else { + sortOrder.direction match { +case Ascending => ListAgg(column, Literal(","), sortOrder.child, false) +case Descending => ListAgg(column, Literal(","), sortOrder.child, true) + } +} Review Comment: ``` val delimiter = ... val reverse = ... val listAgg = ListAgg(column, delimiter, sortOrder.child, reverse) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369921181 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,117 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + val sorted = if (sameExpression) { +if (reverse) { + buffer.toSeq.sorted(ordering.reverse) +} else { + buffer.toSeq.sorted(ordering) +} + } else { +if (reverse) { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +} else { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) +} + } + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) + } + buffer += v +} +buffer + } + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int) : ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression Review Comment: Please put these functions forward: `createAggregationBuffer`, `nullable`, `dataType`, `left` and `right`. You can reference other function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369844354 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: > ListAgg need save two fields. Yes, thanks for your reply patiently. @beliefer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue * We have to store all the collected elements in memory, and so notice that too many elements * can cause GC paused and eventually OutOfMemory Errors. */ -abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] - with UnaryLike[Expression] { +abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] { Review Comment: Change Collect is looks good now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369774164 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: I'm sorry. I get it now. `ListAgg` need save two fields. If so, please go on the path and push it forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369609547 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: > You can try to not define bufferElementType. These are some problems I should to solve. 1. If I use `CollectList` in `ListAgg`, I should redefine `bufferElementType` in `CollectList` so that we can save two fields value each `InternalRow`. (Seem like can not do that at now) 2. If not define `bufferElementType`, seem like I can not use `CollectList` in `ListAgg`. I shouldn't `extend Collect`, so that I can do it without `BinaryLike` and `UnaryLike` problem. 3. The plan for now, I removed `UnaryLike` of `Collect` so that I can use `BinaryLike` and `extend Collect` in `ListAgg` at the same time. So which way should I continue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369579705 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: I see. You can try to not define `bufferElementType`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369493701 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: Thanks @beliefer . But seem like it doesn't solve the problem of save another order field value. PS: you can check the code I commented, almost like you shared. https://github.com/apache/spark/pull/42398#discussion_r1368586674 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369484714 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: ``` ListAgg (...) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] with BinaryLike[Expression] { private val collectList = CollectList() ... override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { collectList.update(...) } override def merge(buffer: ArrayBuffer[Any],, other: Any): Any = { collectList.merge(...) } override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { val ordering = PhysicalDataType.ordering(orderExpr.dataType) val sorted = // sort logic here collectList.eval(sorted) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369484714 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: ``` ListAgg (...) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] with BinaryLike[Expression] { private val collectList = CollectList() ... override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { collectList.update(...) } override def merge(buffer: ArrayBuffer[Any],, other: Any): Any = { collectList.merge(...) } override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { val ordering = PhysicalDataType.ordering(orderExpr.dataType) val sorted = // sort logic here new GenericArrayData(sorted.toArray) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369459076 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: It's a bit difficult for me to understand. The values of the two expressions should be bound before sorting. I don't quite understand the meaning of separate definitions and how to collect the corresponding values separately. Unless two `CollectList` are defined in `ListAgg`. Could you provide some demo? Please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369452950 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: You can declare another type field in `ListAgg` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1368615704 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: Because the order by column can be different with agg column. So we should save it both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1368607666 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: Why need change `bufferElementType`? ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +StructType(Seq( + StructField("value", child.dataType), + StructField("sortOrder", orderExpression.dataType))) + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + val sorted = if (reverse) { + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, Review Comment: I think you just save the temp data into the `CollectList`'s buffer. Then you sort the `CollectList`'s buffer here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1368586674 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: I had tried with that. It seem like can't change bufferElementType in `CollectList` but `ListAgg` want to save two expression result. And this way will rewrite some method in `CollectList` like `eval`, `update`, so I didn't find the value of inheriting `CollectList`, it seems that it is only same with generic classes? Please point out my mistake. Thanks. These are code. ListAgg ```scala case class ListAgg( child: Expression, delimiter: Expression = Literal.create(",", StringType), orderExpression: Expression, reverse: Boolean = false, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] with BinaryLike[Expression] { def this(child: Expression) = this(child, Literal.create(",", StringType), child, false, 0, 0) def this(child: Expression, delimiter: Expression) = this(child, delimiter, child, false, 0, 0) private lazy val collect = CollectList(child, mutableAggBufferOffset, inputAggBufferOffset) // override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) // TODO seem like hard to change bufferElementType in collect // override protected lazy val bufferElementType: DataType = { //StructType(Seq( // StructField("value", child.dataType), // StructField("sortOrder", orderExpression.dataType))) // } override def merge(buffer: ArrayBuffer[Any], input: ArrayBuffer[Any]): ArrayBuffer[Any] = { collect.merge(buffer, input) } override def serialize(buffer: ArrayBuffer[Any]): Array[Byte] = { collect.serialize(buffer) } override def deserialize(storageFormat: Array[Byte]): ArrayBuffer[Any] = { collect.deserialize(storageFormat) } override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { if (buffer.nonEmpty) { val ordering = PhysicalDataType.ordering(orderExpression.dataType) val sorted = if (reverse) { buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, child.dataType)) } else { buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1, orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, child.dataType)) } UTF8String.fromString(sorted.map(_.toString) .mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) } else { UTF8String.fromString("") } } override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { val value = child.eval(input) if (value != null) { buffer += InternalRow.apply(collect.convertToBufferElement(value), collect.convertToBufferElement(orderExpression.eval(input))) } buffer } override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty override def withNewMutableAggBufferOffset( newMutableAggBufferOffset: Int) : ImperativeAggregate = copy(mutableAggBufferOffset = newMutableAggBufferOffset) override def
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1368419550 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -988,6 +988,8 @@ primaryExpression | name=(PERCENTILE_CONT | PERCENTILE_DISC) LEFT_PAREN percentage=valueExpression RIGHT_PAREN WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? ( OVER windowSpec)? #percentile +| LISTAGG LEFT_PAREN setQuantifier? aggEpxr=expression (COMMA delimiter=stringLit)? RIGHT_PAREN +WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (OVER windowSpec)? #listAgg Review Comment: ```suggestion WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (OVER windowSpec)? #listAgg ``` ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: Proxy mode can also be used. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,98 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + "" + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] Review Comment: I think we should avoid change `Collect`. We can declare a field of `CollectList` and `ListAgg` extended `BinaryLike[Expression]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1368090659 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: Thanks @beliefer , I did some updated. Please check again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1359157402 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: You can composite them. Please refer `PercentileCont` or `RegrSlope`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1355031438 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: Hi @holdenk @beliefer , I did some update for `extends Collect`. But there are some problems that bother me. ListAgg needs to save two expressions separately because the expressions used for aggregation and sorting may be different. However, Collect is an implementation of `UnaryLike` that can only have one child, resulting in another expression that cannot be parsed. Any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1351167115 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: I think this issue has been avoided if we refactor with `CollectList` or `extends Collect`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
holdenk commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1351071287 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: So the idea with that is to bubble up the distinct to avoid a shuffle? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
holdenk commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1351070023 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( Review Comment: Similarily lets put this in collect.scala with the other collect friends. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349881268 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( Review Comment: `ListAgg` stores value too. I think reuse `CollectList` or `extends Collect` is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349839317 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], + input: InternalRow): OpenHashMap[AnyRef, Long] = { +val value = child.eval(input) +if (value != null) { + val key = InternalRow.copyValue(value) Review Comment: It could be `UTF8String`, without this, the Map key will be changed after put key into map. Other place also use this way like `CollectList` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349837194 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: So use List if without `DISTINCT` and use Set if `DISTINCT` specified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349836810 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], + input: InternalRow): OpenHashMap[AnyRef, Long] = { +val value = child.eval(input) +if (value != null) { + val key = InternalRow.copyValue(value) Review Comment: The key cached here and not related to `InternalRow`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349836243 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( Review Comment: It would be worked. But seem like `CollectList` will use more memory because it store value not the number of times? Should I do that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349833814 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: Set or List can not store duplicate value. It to make sure invoke it work right without `DISTINCT` expression. Eg `a` and `a` should be `a,a` not `a`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349833053 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], + input: InternalRow): OpenHashMap[AnyRef, Long] = { +val value = child.eval(input) +if (value != null) { + val key = InternalRow.copyValue(value) Review Comment: The value of InternalRow will be rewrite. So we must copy it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349832245 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) + + override def update( + buffer: OpenHashMap[AnyRef, Long], Review Comment: It seems we only need set or list here. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg(
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349829226 ## sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala: ## @@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-42746: listagg function") { +withTempView("df", "df2") { + Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b") +.createOrReplaceTempView("df") + checkAnswer( +sql("select listagg(b) from df group by a"), +Row("b,c") :: Row("c,d") :: Row(null) :: Nil) + + checkAnswer( +sql("select listagg(b, '|') from df group by a"), +Row("b|c") :: Row("c|d") :: Row(null) :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(DISTINCT a) FROM df"), +Row("a,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"), +Row("b,b,a,a") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " + + "OVER (PARTITION BY b) FROM df"), +Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil) + + checkError( +exception = intercept[AnalysisException] { + sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df") Review Comment: oh, this is a mistake, let me fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349828939 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( Review Comment: After a second consider, can we reuse the `CollectList` that is an existent aggregate function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349828105 ## sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala: ## @@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-42746: listagg function") { +withTempView("df", "df2") { + Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b") +.createOrReplaceTempView("df") + checkAnswer( +sql("select listagg(b) from df group by a"), +Row("b,c") :: Row("c,d") :: Row(null) :: Nil) + + checkAnswer( +sql("select listagg(b, '|') from df group by a"), +Row("b|c") :: Row("c|d") :: Row(null) :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(DISTINCT a) FROM df"), +Row("a,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"), +Row("b,b,a,a") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " + + "OVER (PARTITION BY b) FROM df"), +Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil) + + checkError( +exception = intercept[AnalysisException] { + sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df") Review Comment: But this test case without `DISTINCT` keyword. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349825602 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349822359 ## sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala: ## @@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-42746: listagg function") { +withTempView("df", "df2") { + Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b") +.createOrReplaceTempView("df") + checkAnswer( +sql("select listagg(b) from df group by a"), +Row("b,c") :: Row("c,d") :: Row(null) :: Nil) + + checkAnswer( +sql("select listagg(b, '|') from df group by a"), +Row("b|c") :: Row("c|d") :: Row(null) :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(DISTINCT a) FROM df"), +Row("a,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"), +Row("b,b,a,a") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " + + "OVER (PARTITION BY b) FROM df"), +Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil) + + checkError( +exception = intercept[AnalysisException] { + sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df") Review Comment: Yes, so I add `FUNCTION_AND_ORDER_EXPRESSION_MISMATCH` error for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on PR #42398: URL: https://github.com/apache/spark/pull/42398#issuecomment-1752261690 > @Hisoka-X Is `LISTAGG` an ANSI standard? Yes, refer https://modern-sql.com/feature/listagg -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349818074 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); Review Comment: Could you add an example with duplicate values? With DISTINCT or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349817389 ## sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala: ## @@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-42746: listagg function") { +withTempView("df", "df2") { + Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b") +.createOrReplaceTempView("df") + checkAnswer( +sql("select listagg(b) from df group by a"), +Row("b,c") :: Row("c,d") :: Row(null) :: Nil) + + checkAnswer( +sql("select listagg(b, '|') from df group by a"), +Row("b|c") :: Row("c|d") :: Row(null) :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(DISTINCT a) FROM df"), +Row("a,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"), +Row("a,a,b,b") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"), +Row("b,b,a,a") :: Nil) + + checkAnswer( +sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " + + "OVER (PARTITION BY b) FROM df"), +Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil) + + checkError( +exception = intercept[AnalysisException] { + sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df") Review Comment: Is the behavior correct? According to https://docs.snowflake.com/en/sql-reference/functions/listagg, the mismatch issues if you specify different columns for `DISTINCT` and `WITHIN GROUP`. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); Review Comment: Could you add an example with duplicate values? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on PR #42398: URL: https://github.com/apache/spark/pull/42398#issuecomment-1752242004 @Hisoka-X Is `LISTAGG` an ANSI standard? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349810729 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) Review Comment: It used for sql expression, it will be invoked when call from sql like `listagg(b)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
khalidmammadov commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1349758338 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression = Literal.create(",", StringType), +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer + with UnaryLike[Expression] { + + def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0) + def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0) Review Comment: What is the value for these 2 auxilary constractors when default class parameters are also supplied for the same default values? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org