This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ab67f461987 [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures ab67f461987 is described below commit ab67f4619873f21b5dcf7f67658afce7e1028657 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Fri Jun 30 19:44:14 2023 +0300 [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures ### What changes were proposed in this pull request? This PR creates error classes for HyperLogLog function call failures. ### Why are the changes needed? These replace previous Java exceptions or other cases, in order to improve the user experience and bring consistency with other parts of Spark. ### Does this PR introduce _any_ user-facing change? Yes, error messages change slightly. ### How was this patch tested? This PR also adds SQL query test files for the HLL functions. Closes #41486 from dtenedor/hll-error-classes. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 15 + .../aggregate/datasketchesAggregates.scala | 71 +++-- .../expressions/datasketchesExpressions.scala | 29 +- .../spark/sql/errors/QueryExecutionErrors.scala | 26 ++ .../sql-tests/analyzer-results/hll.sql.out | 215 +++++++++++++ .../src/test/resources/sql-tests/inputs/hll.sql | 76 +++++ .../test/resources/sql-tests/results/hll.sql.out | 262 ++++++++++++++++ .../apache/spark/sql/DataFrameAggregateSuite.scala | 338 ++++++++++++--------- 8 files changed, 850 insertions(+), 182 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index db6b9a97012..abe88db1267 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -782,6 +782,21 @@ "The expression <sqlExpr> cannot be used as a grouping expression because its data type <dataType> is not an orderable data type." ] }, + "HLL_INVALID_INPUT_SKETCH_BUFFER" : { + "message" : [ + "Invalid call to <function>; only valid HLL sketch buffers are supported as inputs (such as those produced by the `hll_sketch_agg` function)." + ] + }, + "HLL_INVALID_LG_K" : { + "message" : [ + "Invalid call to <function>; the `lgConfigK` value must be between <min> and <max>, inclusive: <value>." + ] + }, + "HLL_UNION_DIFFERENT_LG_K" : { + "message" : [ + "Sketches have different `lgConfigK` values: <left> and <right>. Set the `allowDifferentLgConfigK` parameter to true to call <function> with different `lgConfigK` values." + ] + }, "IDENTIFIER_TOO_MANY_NAME_PARTS" : { "message" : [ "<identifier> is not a valid identifier as it has more than 2 name parts." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala index 8b24efe12b4..17c69f798d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala @@ -17,23 +17,23 @@ package org.apache.spark.sql.catalyst.expressions.aggregate -import org.apache.datasketches.SketchesArgumentException import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} import org.apache.datasketches.memory.Memory import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal} import org.apache.spark.sql.catalyst.trees.BinaryLike +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, IntegerType, LongType, StringType, TypeCollection} import org.apache.spark.unsafe.types.UTF8String /** - * The HllSketchAgg function utilizes a Datasketches HllSketch instance to - * count a probabilistic approximation of the number of unique values in - * a given column, and outputs the binary representation of the HllSketch. + * The HllSketchAgg function utilizes a Datasketches HllSketch instance to count a probabilistic + * approximation of the number of unique values in a given column, and outputs the binary + * representation of the HllSketch. * - * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information + * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information. * * @param left child expression against which unique counting will occur * @param right the log-base-2 of K, where K is the number of buckets or slots for the sketch @@ -41,7 +41,7 @@ import org.apache.spark.unsafe.types.UTF8String // scalastyle:off line.size.limit @ExpressionDescription( usage = """ - _FUNC_(expr, lgConfigK) - Returns the HllSketch's updateable binary representation. + _FUNC_(expr, lgConfigK) - Returns the HllSketch's updatable binary representation. `lgConfigK` (optional) the log-base-2 of K, with K is the number of buckets or slots for the HllSketch. """, examples = """ @@ -117,9 +117,9 @@ case class HllSketchAgg( } /** - * Evaluate the input row and update the HllSketch instance with the row's value. - * The update function only supports a subset of Spark SQL types, and an - * UnsupportedOperationException will be thrown for unsupported types. + * Evaluate the input row and update the HllSketch instance with the row's value. The update + * function only supports a subset of Spark SQL types, and an exception will be thrown for + * unsupported types. * * @param sketch The HllSketch instance. * @param input an input row @@ -128,10 +128,10 @@ case class HllSketchAgg( val v = left.eval(input) if (v != null) { left.dataType match { - // Update implemented for a subset of types supported by HllSketch - // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out - // Leaving out support for Array types, as unique counting these aren't a common use case - // Leaving out support for floating point types (IE DoubleType) due to imprecision + // This is implemented for a subset of input data types. + // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out. + // We leave out support for Array types, as unique counting these aren't a common use case. + // We leave out support for floating point types (such as DoubleType) due to imprecision. // TODO: implement support for decimal/datetime/interval types case IntegerType => sketch.update(v.asInstanceOf[Int]) case LongType => sketch.update(v.asInstanceOf[Long]) @@ -189,21 +189,20 @@ object HllSketchAgg { private val minLgConfigK = 4 private val maxLgConfigK = 21 - // Replicate Datasketche's HllUtil's checkLgK implementation, as we can't reference it directly + // Replicate Datasketches' HllUtil's checkLgK implementation, as we can't reference it directly. def checkLgK(lgConfigK: Int): Unit = { if (lgConfigK < minLgConfigK || lgConfigK > maxLgConfigK) { - throw new SketchesArgumentException( - s"Log K must be between $minLgConfigK and $maxLgConfigK, inclusive: " + lgConfigK) + throw QueryExecutionErrors.hllInvalidLgK(function = "hll_sketch_agg", + min = minLgConfigK, max = maxLgConfigK, value = lgConfigK.toString) } } } /** - * The HllUnionAgg function ingests and merges Datasketches HllSketch - * instances previously produced by the HllSketchBinary function, and - * outputs the merged HllSketch. + * The HllUnionAgg function ingests and merges Datasketches HllSketch instances previously produced + * by the HllSketchBinary function, and outputs the merged HllSketch. * - * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information + * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information. * * @param left Child expression against which unique counting will occur * @param right Allow sketches with different lgConfigK values @@ -286,20 +285,15 @@ case class HllUnionAgg( } /** - * Helper method to compare lgConfigKs and throw an exception if - * allowDifferentLgConfigK isn't true and configs don't match + * Helper method to compare lgConfigKs and throw an exception if `allowDifferentLgConfigK` isn't + * true and configs don't match. * * @param left An lgConfigK value * @param right An lgConfigK value */ def compareLgConfigK(left: Int, right: Int): Unit = { - if (!allowDifferentLgConfigK) { - if (left != right) { - throw new UnsupportedOperationException( - s"Sketches have different lgConfigK values: $left and $right. " + - "Set allowDifferentLgConfigK to true to enable unions of " + - "different lgConfigK values.") - } + if (!allowDifferentLgConfigK && left != right) { + throw QueryExecutionErrors.hllUnionDifferentLgK(left, right, function = prettyName) } } @@ -314,13 +308,18 @@ case class HllUnionAgg( if (v != null) { left.dataType match { case BinaryType => - val sketch = HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]])) - val union = unionOption.getOrElse(new Union(sketch.getLgConfigK)) - compareLgConfigK(union.getLgConfigK, sketch.getLgConfigK) - union.update(sketch) - Some(union) - case _ => throw new UnsupportedOperationException( - s"A Union instance can only be updated with a valid HllSketch byte array") + try { + val sketch = HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]])) + val union = unionOption.getOrElse(new Union(sketch.getLgConfigK)) + compareLgConfigK(union.getLgConfigK, sketch.getLgConfigK) + union.update(sketch) + Some(union) + } catch { + case _: java.lang.Error => + throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName) + } + case _ => + throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName) } } else { unionOption diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala index 2f1c865e12b..9e3fe52f534 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala @@ -21,6 +21,7 @@ import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} import org.apache.datasketches.memory.Memory import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, LongType} @ExpressionDescription( @@ -51,7 +52,12 @@ case class HllSketchEstimate(child: Expression) override def nullSafeEval(input: Any): Any = { val buffer = input.asInstanceOf[Array[Byte]] - Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate) + try { + Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate) + } catch { + case _: java.lang.Error => + throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName) + } } } @@ -98,15 +104,22 @@ case class HllUnion(first: Expression, second: Expression, third: Expression) override def dataType: DataType = BinaryType override def nullSafeEval(value1: Any, value2: Any, value3: Any): Any = { - val sketch1 = HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]])) - val sketch2 = HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]])) + val sketch1 = try { + HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]])) + } catch { + case _: java.lang.Error => + throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName) + } + val sketch2 = try { + HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]])) + } catch { + case _: java.lang.Error => + throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName) + } val allowDifferentLgConfigK = value3.asInstanceOf[Boolean] if (!allowDifferentLgConfigK && sketch1.getLgConfigK != sketch2.getLgConfigK) { - throw new UnsupportedOperationException( - "Sketches have different lgConfigK values: " + - s"${sketch1.getLgConfigK} and ${sketch2.getLgConfigK}. " + - "Set allowDifferentLgConfigK to true to enable unions of " + - "different lgConfigK values.") + throw QueryExecutionErrors.hllUnionDifferentLgK( + sketch1.getLgConfigK, sketch2.getLgConfigK, function = prettyName) } val union = new Union(Math.min(sketch1.getLgConfigK, sketch2.getLgConfigK)) union.update(sketch1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index c31d01162c5..59b66bd4343 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2855,6 +2855,32 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "enumString" -> enumString)) } + def hllInvalidLgK(function: String, min: Int, max: Int, value: String): Throwable = { + new SparkRuntimeException( + errorClass = "HLL_INVALID_LG_K", + messageParameters = Map( + "function" -> toSQLId(function), + "min" -> toSQLValue(min, IntegerType), + "max" -> toSQLValue(max, IntegerType), + "value" -> value)) + } + + def hllInvalidInputSketchBuffer(function: String): Throwable = { + new SparkRuntimeException( + errorClass = "HLL_INVALID_INPUT_SKETCH_BUFFER", + messageParameters = Map( + "function" -> toSQLId(function))) + } + + def hllUnionDifferentLgK(left: Int, right: Int, function: String): Throwable = { + new SparkRuntimeException( + errorClass = "HLL_UNION_DIFFERENT_LG_K", + messageParameters = Map( + "left" -> toSQLValue(left, IntegerType), + "right" -> toSQLValue(right, IntegerType), + "function" -> toSQLId(function))) + } + def mergeCardinalityViolationError(): SparkRuntimeException = { new SparkRuntimeException( errorClass = "MERGE_CARDINALITY_VIOLATION", diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out new file mode 100644 index 00000000000..58391c0054c --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out @@ -0,0 +1,215 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +DROP TABLE IF EXISTS t1 +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1 + + +-- !query +CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col) +-- !query analysis +CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t1`, ErrorIfExists, [col] + +- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1 +-- !query analysis +Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS result#xL] ++- SubqueryAlias spark_catalog.default.t1 + +- Relation spark_catalog.default.t1[col#x] json + + +-- !query +SELECT hll_sketch_estimate(hll_sketch_agg(col, 12)) +FROM VALUES (50), (60), (60), (60), (75), (100) tab(col) +-- !query analysis +Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(col, 12))#xL] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_sketch_estimate(hll_sketch_agg(col)) +FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col) +-- !query analysis +Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(col, 12))#xL] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_sketch_estimate( + hll_union( + hll_sketch_agg(col1), + hll_sketch_agg(col2))) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2) +-- !query analysis +Aggregate [hll_sketch_estimate(hll_union(hll_sketch_agg(col1#x, 12, 0, 0), hll_sketch_agg(col2#x, 12, 0, 0), false)) AS hll_sketch_estimate(hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 12), false))#xL] ++- SubqueryAlias tab + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT hll_sketch_estimate(hll_union_agg(sketch, true)) + FROM (SELECT hll_sketch_agg(col) as sketch + FROM VALUES (1) AS tab(col) + UNION ALL + SELECT hll_sketch_agg(col, 20) as sketch + FROM VALUES (1) AS tab(col)) +-- !query analysis +Aggregate [hll_sketch_estimate(hll_union_agg(sketch#x, true, 0, 0)) AS hll_sketch_estimate(hll_union_agg(sketch, true))#xL] ++- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Aggregate [hll_sketch_agg(col#x, 12, 0, 0) AS sketch#x] + : +- SubqueryAlias tab + : +- LocalRelation [col#x] + +- Aggregate [hll_sketch_agg(col#x, 20, 0, 0) AS sketch#x] + +- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_sketch_agg(col) +FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"col\"", + "inputType" : "\"ARRAY<INT>\"", + "paramIndex" : "1", + "requiredType" : "(\"INT\" or \"BIGINT\" or \"STRING\" or \"BINARY\")", + "sqlExpr" : "\"hll_sketch_agg(col, 12)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 26, + "fragment" : "hll_sketch_agg(col)" + } ] +} + + +-- !query +SELECT hll_sketch_agg(col, 2) +FROM VALUES (50), (60), (60) tab(col) +-- !query analysis +Aggregate [hll_sketch_agg(col#x, 2, 0, 0) AS hll_sketch_agg(col, 2)#x] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_sketch_agg(col, 40) +FROM VALUES (50), (60), (60) tab(col) +-- !query analysis +Aggregate [hll_sketch_agg(col#x, 40, 0, 0) AS hll_sketch_agg(col, 40)#x] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_union( + hll_sketch_agg(col1, 12), + hll_sketch_agg(col2, 13)) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2) +-- !query analysis +Aggregate [hll_union(hll_sketch_agg(col1#x, 12, 0, 0), hll_sketch_agg(col2#x, 13, 0, 0), false) AS hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 13), false)#x] ++- SubqueryAlias tab + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT hll_union_agg(sketch, false) +FROM (SELECT hll_sketch_agg(col, 12) as sketch + FROM VALUES (1) AS tab(col) + UNION ALL + SELECT hll_sketch_agg(col, 20) as sketch + FROM VALUES (1) AS tab(col)) +-- !query analysis +Aggregate [hll_union_agg(sketch#x, false, 0, 0) AS hll_union_agg(sketch, false)#x] ++- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Aggregate [hll_sketch_agg(col#x, 12, 0, 0) AS sketch#x] + : +- SubqueryAlias tab + : +- LocalRelation [col#x] + +- Aggregate [hll_sketch_agg(col#x, 20, 0, 0) AS sketch#x] + +- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +SELECT hll_union(1, 2) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"1\"", + "inputType" : "\"INT\"", + "paramIndex" : "1", + "requiredType" : "\"BINARY\"", + "sqlExpr" : "\"hll_union(1, 2, false)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 22, + "fragment" : "hll_union(1, 2)" + } ] +} + + +-- !query +SELECT hll_sketch_estimate(CAST ('abc' AS BINARY)) +-- !query analysis +Project [hll_sketch_estimate(cast(abc as binary)) AS hll_sketch_estimate(CAST(abc AS BINARY))#xL] ++- OneRowRelation + + +-- !query +SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY)) +-- !query analysis +Project [hll_union(cast(abc as binary), cast(def as binary), false) AS hll_union(CAST(abc AS BINARY), CAST(def AS BINARY), false)#x] ++- OneRowRelation + + +-- !query +SELECT hll_union_agg(buffer, false) +FROM (SELECT CAST('abc' AS BINARY) AS buffer) +-- !query analysis +Aggregate [hll_union_agg(buffer#x, false, 0, 0) AS hll_union_agg(buffer, false)#x] ++- SubqueryAlias __auto_generated_subquery_name + +- Project [cast(abc as binary) AS buffer#x] + +- OneRowRelation + + +-- !query +DROP TABLE IF EXISTS t1 +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1 diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql b/sql/core/src/test/resources/sql-tests/inputs/hll.sql new file mode 100644 index 00000000000..a0c29cb25a5 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql @@ -0,0 +1,76 @@ +-- Positive test cases +-- Create a table with some testing data. +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col); + +SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1; + +SELECT hll_sketch_estimate(hll_sketch_agg(col, 12)) +FROM VALUES (50), (60), (60), (60), (75), (100) tab(col); + +SELECT hll_sketch_estimate(hll_sketch_agg(col)) +FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col); + +SELECT hll_sketch_estimate( + hll_union( + hll_sketch_agg(col1), + hll_sketch_agg(col2))) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2); + +SELECT hll_sketch_estimate(hll_union_agg(sketch, true)) + FROM (SELECT hll_sketch_agg(col) as sketch + FROM VALUES (1) AS tab(col) + UNION ALL + SELECT hll_sketch_agg(col, 20) as sketch + FROM VALUES (1) AS tab(col)); + +-- Negative test cases +SELECT hll_sketch_agg(col) +FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col); + +SELECT hll_sketch_agg(col, 2) +FROM VALUES (50), (60), (60) tab(col); + +SELECT hll_sketch_agg(col, 40) +FROM VALUES (50), (60), (60) tab(col); + +SELECT hll_union( + hll_sketch_agg(col1, 12), + hll_sketch_agg(col2, 13)) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2); + +SELECT hll_union_agg(sketch, false) +FROM (SELECT hll_sketch_agg(col, 12) as sketch + FROM VALUES (1) AS tab(col) + UNION ALL + SELECT hll_sketch_agg(col, 20) as sketch + FROM VALUES (1) AS tab(col)); + +SELECT hll_union(1, 2) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2); + +-- The HLL functions receive invalid buffers as inputs. +SELECT hll_sketch_estimate(CAST ('abc' AS BINARY)); + +SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY)); + +SELECT hll_union_agg(buffer, false) +FROM (SELECT CAST('abc' AS BINARY) AS buffer); + +-- Clean up +DROP TABLE IF EXISTS t1; diff --git a/sql/core/src/test/resources/sql-tests/results/hll.sql.out b/sql/core/src/test/resources/sql-tests/results/hll.sql.out new file mode 100644 index 00000000000..c8a2e9a2faf --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/hll.sql.out @@ -0,0 +1,262 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +DROP TABLE IF EXISTS t1 +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1 +-- !query schema +struct<result:bigint> +-- !query output +5 + + +-- !query +SELECT hll_sketch_estimate(hll_sketch_agg(col, 12)) +FROM VALUES (50), (60), (60), (60), (75), (100) tab(col) +-- !query schema +struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint> +-- !query output +4 + + +-- !query +SELECT hll_sketch_estimate(hll_sketch_agg(col)) +FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col) +-- !query schema +struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint> +-- !query output +3 + + +-- !query +SELECT hll_sketch_estimate( + hll_union( + hll_sketch_agg(col1), + hll_sketch_agg(col2))) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2) +-- !query schema +struct<hll_sketch_estimate(hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 12), false)):bigint> +-- !query output +6 + + +-- !query +SELECT hll_sketch_estimate(hll_union_agg(sketch, true)) + FROM (SELECT hll_sketch_agg(col) as sketch + FROM VALUES (1) AS tab(col) + UNION ALL + SELECT hll_sketch_agg(col, 20) as sketch + FROM VALUES (1) AS tab(col)) +-- !query schema +struct<hll_sketch_estimate(hll_union_agg(sketch, true)):bigint> +-- !query output +1 + + +-- !query +SELECT hll_sketch_agg(col) +FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"col\"", + "inputType" : "\"ARRAY<INT>\"", + "paramIndex" : "1", + "requiredType" : "(\"INT\" or \"BIGINT\" or \"STRING\" or \"BINARY\")", + "sqlExpr" : "\"hll_sketch_agg(col, 12)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 26, + "fragment" : "hll_sketch_agg(col)" + } ] +} + + +-- !query +SELECT hll_sketch_agg(col, 2) +FROM VALUES (50), (60), (60) tab(col) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_INVALID_LG_K", + "messageParameters" : { + "function" : "`hll_sketch_agg`", + "max" : "21", + "min" : "4", + "value" : "2" + } +} + + +-- !query +SELECT hll_sketch_agg(col, 40) +FROM VALUES (50), (60), (60) tab(col) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_INVALID_LG_K", + "messageParameters" : { + "function" : "`hll_sketch_agg`", + "max" : "21", + "min" : "4", + "value" : "40" + } +} + + +-- !query +SELECT hll_union( + hll_sketch_agg(col1, 12), + hll_sketch_agg(col2, 13)) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_UNION_DIFFERENT_LG_K", + "messageParameters" : { + "function" : "`hll_union`", + "left" : "12", + "right" : "13" + } +} + + +-- !query +SELECT hll_union_agg(sketch, false) +FROM (SELECT hll_sketch_agg(col, 12) as sketch + FROM VALUES (1) AS tab(col) + UNION ALL + SELECT hll_sketch_agg(col, 20) as sketch + FROM VALUES (1) AS tab(col)) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_UNION_DIFFERENT_LG_K", + "messageParameters" : { + "function" : "`hll_union_agg`", + "left" : "12", + "right" : "20" + } +} + + +-- !query +SELECT hll_union(1, 2) + FROM VALUES + (1, 4), + (1, 4), + (2, 5), + (2, 5), + (3, 6) AS tab(col1, col2) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"1\"", + "inputType" : "\"INT\"", + "paramIndex" : "1", + "requiredType" : "\"BINARY\"", + "sqlExpr" : "\"hll_union(1, 2, false)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 22, + "fragment" : "hll_union(1, 2)" + } ] +} + + +-- !query +SELECT hll_sketch_estimate(CAST ('abc' AS BINARY)) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER", + "messageParameters" : { + "function" : "`hll_sketch_estimate`" + } +} + + +-- !query +SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY)) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER", + "messageParameters" : { + "function" : "`hll_union`" + } +} + + +-- !query +SELECT hll_union_agg(buffer, false) +FROM (SELECT CAST('abc' AS BINARY) AS buffer) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER", + "messageParameters" : { + "function" : "`hll_union_agg`" + } +} + + +-- !query +DROP TABLE IF EXISTS t1 +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 577d86c2d9a..b6e1f5af011 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.scalatest.matchers.must.Matchers.the -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} @@ -1855,149 +1855,211 @@ class DataFrameAggregateSuite extends QueryTest df2.createOrReplaceTempView("df2") // validate that the functions error out when lgConfigK < 4 or > 24 - val error0 = intercept[SparkException] { - val res = df1.groupBy("id") - .agg( - hll_sketch_agg("value", 1).as("hllsketch") - ) - checkAnswer(res, Nil) - } - assert(error0.toString contains "SketchesArgumentException") - - val error1 = intercept[SparkException] { - val res = df1.groupBy("id") - .agg( - hll_sketch_agg("value", 25).as("hllsketch") - ) - checkAnswer(res, Nil) - } - assert(error1.toString contains "SketchesArgumentException") + checkError( + exception = intercept[SparkException] { + val res = df1.groupBy("id") + .agg( + hll_sketch_agg("value", 1).as("hllsketch") + ) + checkAnswer(res, Nil) + }.getCause.asInstanceOf[SparkThrowable], + errorClass = "HLL_INVALID_LG_K", + parameters = Map( + "function" -> "`hll_sketch_agg`", + "min" -> "4", + "max" -> "21", + "value" -> "1" + )) + + checkError( + exception = intercept[SparkException] { + val res = df1.groupBy("id") + .agg( + hll_sketch_agg("value", 25).as("hllsketch") + ) + checkAnswer(res, Nil) + }.getCause.asInstanceOf[SparkThrowable], + errorClass = "HLL_INVALID_LG_K", + parameters = Map( + "function" -> "`hll_sketch_agg`", + "min" -> "4", + "max" -> "21", + "value" -> "25" + )) // validate that unions error out by default for different lgConfigK sketches - val error2 = intercept[SparkException] { - val i1 = df1.groupBy("id") - .agg( - hll_sketch_agg("value").as("hllsketch_left") - ) - val i2 = df2.groupBy("id") - .agg( - hll_sketch_agg("value", 20).as("hllsketch_right") - ) - val res = i1.join(i2).withColumn("union", hll_union("hllsketch_left", "hllsketch_right")) - checkAnswer(res, Nil) - } - assert(error2.toString contains "UnsupportedOperationException") - - val error3 = intercept[SparkException] { - val i1 = df1.groupBy("id") - .agg( - hll_sketch_agg("value").as("hllsketch") - ) - val i2 = df2.groupBy("id") - .agg( - hll_sketch_agg("value", 20).as("hllsketch") - ) - val res = i1.union(i2).groupBy("id") - .agg( - hll_union_agg("hllsketch") - ) - checkAnswer(res, Nil) - } - assert(error3.toString contains "UnsupportedOperationException") + checkError( + exception = intercept[SparkException] { + val i1 = df1.groupBy("id") + .agg( + hll_sketch_agg("value").as("hllsketch_left") + ) + val i2 = df2.groupBy("id") + .agg( + hll_sketch_agg("value", 20).as("hllsketch_right") + ) + val res = i1.join(i2).withColumn("union", hll_union("hllsketch_left", "hllsketch_right")) + checkAnswer(res, Nil) + }.getCause.asInstanceOf[SparkThrowable], + errorClass = "HLL_UNION_DIFFERENT_LG_K", + parameters = Map( + "left" -> "12", + "right" -> "20", + "function" -> "`hll_union`" + )) + + checkError( + exception = intercept[SparkException] { + val i1 = df1.groupBy("id") + .agg( + hll_sketch_agg("value").as("hllsketch") + ) + val i2 = df2.groupBy("id") + .agg( + hll_sketch_agg("value", 20).as("hllsketch") + ) + val res = i1.union(i2).groupBy("id") + .agg( + hll_union_agg("hllsketch") + ) + checkAnswer(res, Nil) + }.getCause.asInstanceOf[SparkThrowable], + errorClass = "HLL_UNION_DIFFERENT_LG_K", + parameters = Map( + "left" -> "12", + "right" -> "20", + "function" -> "`hll_union_agg`" + )) // validate that the functions error out when provided unexpected types - val error4 = intercept[AnalysisException] { - val res = sql( - """ - |select - | id, - | hll_sketch_agg(value, 'text') - |from - | df1 - |group by 1 - |""".stripMargin) - checkAnswer(res, Nil) - } - assert(error4.toString contains "UNEXPECTED_INPUT_TYPE") - - val error5 = intercept[AnalysisException] { - val res = sql( - """with sketch_cte as ( - |select - | id, - | hll_sketch_agg(value) as sketch - |from - | df1 - |group by 1 - |) - | - |select hll_union_agg(sketch, 'Hll_4') from sketch_cte - |""".stripMargin) - checkAnswer(res, Nil) - } - assert(error5.toString contains "UNEXPECTED_INPUT_TYPE") + checkError( + exception = intercept[AnalysisException] { + val res = sql( + """ + |select + | id, + | hll_sketch_agg(value, 'text') + |from + | df1 + |group by 1 + |""".stripMargin) + checkAnswer(res, Nil) + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"hll_sketch_agg(value, text)\"", + "paramIndex" -> "2", + "inputSql" -> "\"text\"", + "inputType" -> "\"STRING\"", + "requiredType" -> "\"INT\"" + ), + context = ExpectedContext( + fragment = "hll_sketch_agg(value, 'text')", + start = 14, + stop = 42)) + + checkError( + exception = intercept[AnalysisException] { + val res = sql( + """with sketch_cte as ( + |select + | id, + | hll_sketch_agg(value) as sketch + |from + | df1 + |group by 1 + |) + | + |select hll_union_agg(sketch, 'Hll_4') from sketch_cte + |""".stripMargin) + checkAnswer(res, Nil) + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"hll_union_agg(sketch, Hll_4)\"", + "paramIndex" -> "2", + "inputSql" -> "\"Hll_4\"", + "inputType" -> "\"STRING\"", + "requiredType" -> "\"BOOLEAN\"" + ), + context = ExpectedContext( + fragment = "hll_union_agg(sketch, 'Hll_4')", + start = 97, + stop = 126)) // validate that unions error out by default for different lgConfigK sketches - val error6 = intercept[SparkException] { - val res = sql( - """with cte1 as ( - |select - | id, - | hll_sketch_agg(value) as sketch - |from - | df1 - |group by 1 - |), - | - |cte2 as ( - |select - | id, - | hll_sketch_agg(value, 20) as sketch - |from - | df2 - |group by 1 - |) - | - |select - | cte1.id, - | hll_union(cte1.sketch, cte2.sketch) as sketch - |from - | cte1 join cte2 on cte1.id = cte2.id - |""".stripMargin) - checkAnswer(res, Nil) - } - assert(error6.toString contains "UnsupportedOperationException") - - val error7 = intercept[SparkException] { - val res = sql( - """with cte1 as ( - |select - | id, - | hll_sketch_agg(value) as sketch - |from - | df1 - |group by 1 - |), - | - |cte2 as ( - |select - | id, - | hll_sketch_agg(value, 20) as sketch - |from - | df2 - |group by 1 - |) - | - |select - | id, - | hll_union_agg(sketch) as sketch - |from - | (select * from cte1 union all select * from cte2) - |group by 1 - |""".stripMargin) - checkAnswer(res, Nil) - } - assert(error7.toString contains "UnsupportedOperationException") + checkError( + exception = intercept[SparkException] { + val res = sql( + """with cte1 as ( + |select + | id, + | hll_sketch_agg(value) as sketch + |from + | df1 + |group by 1 + |), + | + |cte2 as ( + |select + | id, + | hll_sketch_agg(value, 20) as sketch + |from + | df2 + |group by 1 + |) + | + |select + | cte1.id, + | hll_union(cte1.sketch, cte2.sketch) as sketch + |from + | cte1 join cte2 on cte1.id = cte2.id + |""".stripMargin) + checkAnswer(res, Nil) + }.getCause.asInstanceOf[SparkThrowable], + errorClass = "HLL_UNION_DIFFERENT_LG_K", + parameters = Map( + "left" -> "12", + "right" -> "20", + "function" -> "`hll_union`" + )) + + checkError( + exception = intercept[SparkException] { + val res = sql( + """with cte1 as ( + |select + | id, + | hll_sketch_agg(value) as sketch + |from + | df1 + |group by 1 + |), + | + |cte2 as ( + |select + | id, + | hll_sketch_agg(value, 20) as sketch + |from + | df2 + |group by 1 + |) + | + |select + | id, + | hll_union_agg(sketch) as sketch + |from + | (select * from cte1 union all select * from cte2) + |group by 1 + |""".stripMargin) + checkAnswer(res, Nil) + }.getCause.asInstanceOf[SparkThrowable], + errorClass = "HLL_UNION_DIFFERENT_LG_K", + parameters = Map( + "left" -> "12", + "right" -> "20", + "function" -> "`hll_union_agg`" + )) } test("SPARK-43876: Enable fast hashmap for distinct queries") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org