dtenedor commented on code in PR #52883:
URL: https://github.com/apache/spark/pull/52883#discussion_r2710269720
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -6158,13 +6164,7 @@
"message" : [
"Invalid call to <function>; only valid Theta sketch buffers are
supported as inputs (such as those produced by the `theta_sketch_agg`
function)."
],
- "sqlState" : "22546"
- },
- "THETA_INVALID_LG_NOM_ENTRIES" : {
- "message" : [
- "Invalid call to <function>; the `lgNomEntries` value must be between
<min> and <max>, inclusive: <value>."
- ],
- "sqlState" : "22546"
+ "sqlState" : "22000"
Review Comment:
Is this SQL state change intentional?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##########
@@ -816,6 +822,24 @@ object FunctionRegistry {
expression[ThetaDifference]("theta_difference"),
expression[ThetaIntersection]("theta_intersection"),
expression[ApproxTopKEstimate]("approx_top_k_estimate"),
+ expression[TupleSketchEstimateDouble]("tuple_sketch_estimate_double"),
+ expression[TupleSketchEstimateInteger]("tuple_sketch_estimate_integer"),
+ expression[TupleSketchThetaDouble]("tuple_sketch_theta_double"),
+ expression[TupleSketchThetaInteger]("tuple_sketch_theta_integer"),
+ expression[TupleSketchSummaryDouble]("tuple_sketch_summary_double"),
Review Comment:
I recommend to sort these new functions alphabetically.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/tupleSketchAgg.scala:
##########
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.tuple.{Sketch, SummaryFactory,
SummarySetOperations, Union, UpdatableSketchBuilder, UpdatableSummary}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary,
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary,
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder,
TypeCheckResult}
+import org.apache.spark.sql.catalyst.expressions.{Expression,
ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature,
InputParameter}
+import org.apache.spark.sql.catalyst.trees.QuaternaryLike
+import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory,
SketchSize, SummaryAggregateMode, ThetaSketchUtils, TupleSketchUtils,
TupleSummaryMode}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, ArrayType, BinaryType,
DataType, DoubleType, FloatType, IntegerType, LongType, StringType,
TypeCollection}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The TupleSketchAggDouble function utilizes a Datasketches TupleSketch
instance to count a
+ * probabilistic approximation of the number of unique values in a given
column with associated
+ * double type summary values that can be aggregated using different modes
(sum, min, max,
+ * alwaysone), and outputs the binary representation of the TupleSketch.
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
However, summary
+ * value types must be consistent across all calls; mixing types can produce
incorrect results or
+ * precision loss. The value type suffix in the function name (e.g., _double)
ensures type safety.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param key
+ * key expression against which unique counting will occur
+ * @param summary
+ * summary expression (double type) against which different mode
aggregations will occur
+ * @param lgNomEntries
+ * the log-base-2 of nomEntries decides the number of buckets for the sketch
+ * @param mode
+ * the aggregation mode for numeric summaries (sum, min, max, alwaysone)
+ * @param mutableAggBufferOffset
+ * offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ * offset for input aggregation buffer
+ */
+case class TupleSketchAggDouble(
+ key: Expression,
+ summary: Expression,
+ lgNomEntries: Expression,
+ mode: Expression,
+ override val mutableAggBufferOffset: Int,
+ override val inputAggBufferOffset: Int)
+ extends TupleSketchAggBase[java.lang.Double, DoubleSummary]
+ with QuaternaryLike[Expression] {
+
+ // Constructors
+ def this(key: Expression, summary: Expression) = {
+ this(
+ key,
+ summary,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString),
+ 0,
+ 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression) = {
+ this(key, summary, lgNomEntries, Literal(TupleSummaryMode.Sum.toString),
0, 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression,
mode: Expression) = {
+ this(key, summary, lgNomEntries, mode, 0, 0)
+ }
+
+ // Copy constructors required by ImperativeAggregate
+ override def withNewMutableAggBufferOffset(
+ newMutableAggBufferOffset: Int): TupleSketchAggDouble =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
TupleSketchAggDouble =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override protected def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleSketchAggDouble =
+ copy(key = newFirst, summary = newSecond, lgNomEntries = newThird, mode =
newFourth)
+
+ override def first: Expression = key
+ override def second: Expression = summary
+ override def third: Expression = lgNomEntries
+ override def fourth: Expression = mode
+
+ // Override for TypedImperativeAggregate
+ override def prettyName: String = "tuple_sketch_agg_double"
+
+ /** Specifies accepted summary input types (double). */
+ override protected def summaryInputType: AbstractDataType = DoubleType
+
+ /**
+ * Creates a DoubleSummaryFactory with the configured aggregation mode.
+ */
+ override protected def createSummaryFactory(): SummaryFactory[DoubleSummary]
= {
+ new DoubleSummaryFactory(modeEnum.toDoubleSummaryMode)
+ }
+
+ /**
+ * Creates DoubleSummarySetOperations for merge operations with the
configured mode.
+ */
+ override protected def createSummarySetOperations():
SummarySetOperations[DoubleSummary] = {
+ new DoubleSummarySetOperations(modeEnum.toDoubleSummaryMode)
+ }
+
+ /**
+ * Heapify a sketch from a byte array.
+ *
+ * @param buffer
+ * the serialized sketch byte array
+ * @return
+ * a Sketch[DoubleSummary] instance
+ */
+ override protected def heapifySketch(buffer: Array[Byte]):
Sketch[DoubleSummary] = {
+ TupleSketchUtils.heapifyDoubleSketch(buffer, prettyName)
+ }
+}
+
+/**
+ * The TupleSketchAggInteger function utilizes a Datasketches TupleSketch
instance to count a
+ * probabilistic approximation of the number of unique values in a given
column with associated
+ * integer type summary values that can be aggregated using different modes
(sum, min, max,
+ * alwaysone), and outputs the binary representation of the TupleSketch.
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
However, summary
+ * value types must be consistent across all calls; mixing types can produce
incorrect results or
+ * precision loss. The value type suffix in the function name (e.g., _integer)
ensures type safety.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param key
+ * key expression against which unique counting will occur
+ * @param summary
+ * summary expression (integer type) against which different mode
aggregations will occur
+ * @param lgNomEntries
+ * the log-base-2 of nomEntries decides the number of buckets for the sketch
+ * @param mode
+ * the aggregation mode for numeric summaries (sum, min, max, alwaysone)
+ * @param mutableAggBufferOffset
+ * offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ * offset for input aggregation buffer
+ */
+case class TupleSketchAggInteger(
+ key: Expression,
+ summary: Expression,
+ lgNomEntries: Expression,
+ mode: Expression,
+ override val mutableAggBufferOffset: Int,
+ override val inputAggBufferOffset: Int)
+ extends TupleSketchAggBase[Integer, IntegerSummary]
+ with QuaternaryLike[Expression] {
+
+ // Constructors
+ def this(key: Expression, summary: Expression) = {
+ this(
+ key,
+ summary,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString),
+ 0,
+ 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression) = {
+ this(key, summary, lgNomEntries, Literal(TupleSummaryMode.Sum.toString),
0, 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression,
mode: Expression) = {
+ this(key, summary, lgNomEntries, mode, 0, 0)
+ }
+
+ // Copy constructors required by ImperativeAggregate
+ override def withNewMutableAggBufferOffset(
+ newMutableAggBufferOffset: Int): TupleSketchAggInteger =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
TupleSketchAggInteger =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override protected def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleSketchAggInteger =
+ copy(key = newFirst, summary = newSecond, lgNomEntries = newThird, mode =
newFourth)
+
+ override def first: Expression = key
+ override def second: Expression = summary
+ override def third: Expression = lgNomEntries
+ override def fourth: Expression = mode
+
+ // Override for TypedImperativeAggregate
+ override def prettyName: String = "tuple_sketch_agg_integer"
+
+ /** Specifies accepted summary input types (integer). */
+ override protected def summaryInputType: AbstractDataType =
+ IntegerType
+
+ /**
+ * Creates an IntegerSummaryFactory with the configured aggregation mode.
+ */
+ override protected def createSummaryFactory():
SummaryFactory[IntegerSummary] = {
+ new IntegerSummaryFactory(modeEnum.toIntegerSummaryMode)
+ }
+
+ /**
+ * Creates IntegerSummarySetOperations for merge operations with the
configured mode.
+ */
+ override protected def createSummarySetOperations():
SummarySetOperations[IntegerSummary] = {
+ val mode = modeEnum.toIntegerSummaryMode
+ new IntegerSummarySetOperations(mode, mode)
+ }
+
+ /**
+ * Heapify a sketch from a byte array.
+ *
+ * @param buffer
+ * the serialized sketch byte array
+ * @return
+ * a Sketch[IntegerSummary] instance
+ */
+ override protected def heapifySketch(buffer: Array[Byte]):
Sketch[IntegerSummary] = {
+ TupleSketchUtils.heapifyIntegerSketch(buffer, prettyName)
+ }
+}
+
+abstract class TupleSketchAggBase[U, S <: UpdatableSummary[U]]
+ extends TypedImperativeAggregate[TupleSketchState[S]]
+ with SketchSize
+ with SummaryAggregateMode
+ with ImplicitCastInputTypes {
+
+ // Abstract methods that subclasses must implement
+ protected def summaryInputType: AbstractDataType
+ protected def createSummaryFactory(): SummaryFactory[S]
+ protected def createSummarySetOperations(): SummarySetOperations[S]
+ protected def heapifySketch(buffer: Array[Byte]): Sketch[S]
+
+ // Abstract members that subclasses must implement
+ protected def key: Expression
+ protected def summary: Expression
+
+ // Type and input validation overrides
+ protected final val keyInputTypes: AbstractDataType =
+ TypeCollection(
+ ArrayType(IntegerType),
+ ArrayType(LongType),
+ BinaryType,
+ DoubleType,
+ FloatType,
+ IntegerType,
+ LongType,
+ StringTypeWithCollation(supportsTrimCollation = true))
+
+ override def dataType: DataType = BinaryType
+ override def nullable: Boolean = false
+
+ override def inputTypes: Seq[AbstractDataType] =
+ Seq(
+ keyInputTypes,
+ summaryInputType,
+ IntegerType,
+ StringTypeWithCollation(supportsTrimCollation = true))
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val defaultCheck = super.checkInputDataTypes()
+ val lgCheck = checkLgNomEntriesParameter()
+
+ if (defaultCheck.isFailure) {
+ defaultCheck
+ } else if (lgCheck.isFailure) {
+ lgCheck
+ } else {
+ checkModeParameter()
+ }
+ }
+
+ /**
+ * Instantiate an UpdatableSketch instance using the lgNomEntries param and
summary factory.
+ *
+ * @return
+ * an UpdatableSketch instance wrapped with UpdatableTupleSketchBuffer
+ */
+ override def createAggregationBuffer(): TupleSketchState[S] = {
+ val factory = createSummaryFactory()
+ val builder = new UpdatableSketchBuilder[U, S](factory)
+ builder.setNominalEntries(1 << lgNomEntriesInput)
+ val sketch = builder.build()
+ UpdatableTupleSketchBuffer(sketch)
+ }
+
+ /**
+ * Evaluate the input row and update the UpdatableSketch instance with the
row's key and summary
+ * value. The update function only supports a subset of Spark SQL types, and
an exception will
+ * be thrown for unsupported types. Notes:
+ * - Null values are ignored.
+ * - Empty byte arrays are ignored
+ * - Empty arrays of supported element types are ignored
+ * - Strings that are collation-equal to the empty string are ignored.
+ *
+ * @param updateBuffer
+ * A previously initialized UpdatableSketch instance
+ * @param input
+ * An input row
+ */
+ override def update(
+ updateBuffer: TupleSketchState[S],
+ input: InternalRow): TupleSketchState[S] = {
+ val keyValue = key.eval(input)
+ val summaryValue = summary.eval(input)
+
+ // Return early for null values.
+ if (keyValue == null || summaryValue == null) {
+ updateBuffer
+ } else {
+ // Type checking is already done by ImplicitCastInputTypes.
+ val normalizedSummary = summaryValue.asInstanceOf[U]
+
+ // Initialized buffer should be UpdatableTupleSketchBuffer, else error
out.
+ val sketch = updateBuffer match {
+ case UpdatableTupleSketchBuffer(s) => s
+ case _ => throw
QueryExecutionErrors.tupleInvalidInputSketchBuffer(prettyName)
+ }
+
+ key.dataType match {
+ case ArrayType(IntegerType, _) =>
+ val arr = keyValue.asInstanceOf[ArrayData].toIntArray()
+ sketch.update(arr, normalizedSummary)
+ case ArrayType(LongType, _) =>
+ val arr = keyValue.asInstanceOf[ArrayData].toLongArray()
+ sketch.update(arr, normalizedSummary)
+ case BinaryType =>
+ val bytes = keyValue.asInstanceOf[Array[Byte]]
+ sketch.update(bytes, normalizedSummary)
+ case DoubleType =>
+ sketch.update(keyValue.asInstanceOf[Double], normalizedSummary)
Review Comment:
I wonder if it would simplify the code to pattern-match against both
`(key.dataType, keyValue)`? Then you could have a case like `case (DoubleType,
v: Double)` and then just use `v` directly, dropping all the `asInstanceOf`s.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TupleSketchUtils.scala:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.util.Locale
+
+import org.apache.datasketches.memory.{Memory, MemoryBoundsException}
+import org.apache.datasketches.tuple.{Sketch, Sketches, Summary,
TupleSketchIterator}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary,
DoubleSummaryDeserializer}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary,
IntegerSummaryDeserializer}
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Sealed trait representing valid summary modes for tuple sketches. This
provides type-safe
+ * mode handling with compile-time exhaustiveness checking and prevents
invalid modes from
+ * being created.
+ */
+sealed trait TupleSummaryMode {
+ def toDoubleSummaryMode: DoubleSummary.Mode
+
+ def toIntegerSummaryMode: IntegerSummary.Mode
+
+ def toString: String
+}
+
+object TupleSummaryMode {
+ case object Sum extends TupleSummaryMode {
+ def toDoubleSummaryMode: DoubleSummary.Mode = DoubleSummary.Mode.Sum
+ def toIntegerSummaryMode: IntegerSummary.Mode = IntegerSummary.Mode.Sum
+ override def toString: String = "sum"
+ }
+
+ case object Min extends TupleSummaryMode {
+ def toDoubleSummaryMode: DoubleSummary.Mode = DoubleSummary.Mode.Min
+ def toIntegerSummaryMode: IntegerSummary.Mode = IntegerSummary.Mode.Min
+ override def toString: String = "min"
+ }
+
+ case object Max extends TupleSummaryMode {
+ def toDoubleSummaryMode: DoubleSummary.Mode = DoubleSummary.Mode.Max
+ def toIntegerSummaryMode: IntegerSummary.Mode = IntegerSummary.Mode.Max
+ override def toString: String = "max"
+ }
+
+ case object AlwaysOne extends TupleSummaryMode {
+ def toDoubleSummaryMode: DoubleSummary.Mode = DoubleSummary.Mode.AlwaysOne
+ def toIntegerSummaryMode: IntegerSummary.Mode =
IntegerSummary.Mode.AlwaysOne
+ override def toString: String = "alwaysone"
+ }
+
+ /** All valid modes */
+ val validModes: Seq[TupleSummaryMode] = Seq(Sum, Min, Max, AlwaysOne)
+
+ /** String representations of valid modes for error messages */
+ val validModeStrings: Seq[String] = validModes.map(_.toString)
+
+ /**
+ * Parses a string into a TupleSummaryMode. This is the single entry point
for string-to-mode
+ * conversion, ensuring validation happens once.
+ *
+ * @param s The mode string to parse
+ * @param functionName The display name of the function/expression for error
messages
+ * @return The corresponding TupleSummaryMode
+ * @throws QueryExecutionErrors.tupleInvalidMode if the mode string is
invalid
+ */
+ def fromString(s: String, functionName: String): TupleSummaryMode = {
+ s.toLowerCase(Locale.ROOT) match {
+ case "sum" => Sum
+ case "min" => Min
+ case "max" => Max
+ case "alwaysone" => AlwaysOne
+ case _ => throw QueryExecutionErrors.tupleInvalidMode(functionName, s,
validModeStrings)
+ }
+ }
+}
+
+/**
+ * Trait for TupleSketch aggregation functions that use the lgNomEntries
parameter. Provides
+ * validation and extraction functionality for the log-base-2 of nominal
entries.
+ */
+trait SketchSize extends AggregateFunction {
+
+ /** log-base-2 of nominal entries (determines sketch size). */
+ def lgNomEntries: Expression
+
+ /** Returns the pretty name of the aggregation function for error messages.
*/
+ protected def prettyName: String
+
+ /**
+ * Validates that lgNomEntries parameter is a constant and within valid
range (4-26).
+ */
+ protected def checkLgNomEntriesParameter(): TypeCheckResult = {
+ if (!lgNomEntries.foldable) {
+ DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map(
+ "inputName" -> "lgNomEntries",
+ "inputType" -> "int",
+ "inputExpr" -> lgNomEntries.sql))
+ } else if (lgNomEntries.eval() == null) {
+ DataTypeMismatch(
+ errorSubClass = "UNEXPECTED_NULL",
+ messageParameters = Map("exprName" -> "lgNomEntries"))
+ } else {
+ try {
+ ThetaSketchUtils.checkLgNomLongs(lgNomEntriesInput, prettyName)
+ TypeCheckResult.TypeCheckSuccess
+ } catch {
+ case e: Exception =>
+ TypeCheckResult.TypeCheckFailure(e.getMessage)
Review Comment:
Do you think this could leak obscure offsets or other information into the
error message? Do we have test coverage for this? Same for L196 below.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/tupleSketchEstimate.scala:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.TupleSketchUtils
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType,
DoubleType}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(child) - Returns the estimated number of unique values
+ given the binary representation of a Datasketches TupleSketch. The sketch's
+ summary type must be a double. """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_(tuple_sketch_agg_double(key, summary)) FROM VALUES (1,
1.0D), (1, 2.0D), (2, 3.0D) tab(key, summary);
+ 2.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleSketchEstimateDouble(child: Expression)
+ extends UnaryExpression
+ with CodegenFallback
+ with ExpectsInputTypes {
+
+ override def nullIntolerant: Boolean = true
+
Review Comment:
super nit, I think you can remove these newlines between methods and just
group all the lines in each case class together, with newlines separating the
case classes.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/tupleIntersection.scala:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.tuple.{Intersection, Summary,
SummarySetOperations}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary,
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary,
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ThetaSketchUtils, TupleSketchUtils,
TupleSummaryMode}
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+import org.apache.spark.unsafe.types.UTF8String
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch1, tupleSketch2, mode) - Intersects two binary
representations of Datasketches
+ TupleSketch objects with double summary data type using a TupleSketch
Intersection object.
+ Users can set mode to 'sum', 'min', 'max', or 'alwaysone' (defaults to
'sum'). """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_double(_FUNC_(tuple_sketch_agg_double(col1, val1),
tuple_sketch_agg_double(col2, val2))) FROM VALUES (1, 1.0D, 1, 4.0D), (2, 2.0D,
2, 5.0D), (3, 3.0D, 4, 6.0D) tab(col1, val1, col2, val2);
+ 2.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleIntersectionDouble(first: Expression, second: Expression,
third: Expression)
+ extends TupleIntersectionBase[DoubleSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(first, second, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression): TupleIntersectionDouble =
+ copy(first = newFirst, second = newSecond, third = newThird)
+
+ override def prettyName: String = "tuple_intersection_double"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[DoubleSummary] =
+ new DoubleSummarySetOperations(mode.toDoubleSummaryMode)
+
+ override protected def intersectSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ intersection: Intersection[DoubleSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch1 = TupleSketchUtils.heapifyDoubleSketch(sketch1Bytes,
prettyName)
+ val tupleSketch2 = TupleSketchUtils.heapifyDoubleSketch(sketch2Bytes,
prettyName)
+
+ intersection.intersect(tupleSketch1)
+ intersection.intersect(tupleSketch2)
+ }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch, thetaSketch, mode) - Intersects the binary
representation of a
+ Datasketches TupleSketch with double summary data type with the binary
representation of a
+ Datasketches ThetaSketch using a TupleSketch Intersection object. The
ThetaSketch entries are
+ assigned a default double summary value. Users can set mode to 'sum',
'min', 'max', or 'alwaysone'
+ (defaults to 'sum'). """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_double(_FUNC_(tuple_sketch_agg_double(col1, val1),
theta_sketch_agg(col2))) FROM VALUES (1, 1.0D, 1), (2, 2.0D, 2), (3, 3.0D, 4)
tab(col1, val1, col2);
+ 2.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleIntersectionThetaDouble(first: Expression, second: Expression,
third: Expression)
+ extends TupleIntersectionBase[DoubleSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(first, second, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression): TupleIntersectionThetaDouble =
+ copy(first = newFirst, second = newSecond, third = newThird)
+
+ override def prettyName: String = "tuple_intersection_theta_double"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[DoubleSummary] =
+ new DoubleSummarySetOperations(mode.toDoubleSummaryMode)
+
+ override protected def intersectSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ intersection: Intersection[DoubleSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch = TupleSketchUtils.heapifyDoubleSketch(sketch1Bytes,
prettyName)
+ val thetaSketch = ThetaSketchUtils.wrapCompactSketch(sketch2Bytes,
prettyName)
+
+ val defaultSummary = new
DoubleSummaryFactory(mode.toDoubleSummaryMode).newSummary()
+
+ intersection.intersect(tupleSketch)
+ intersection.intersect(thetaSketch, defaultSummary)
+ }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch1, tupleSketch2, mode) - Intersects two binary
representations of Datasketches
+ TupleSketch objects with integer summary data type using a TupleSketch
Intersection object.
+ Users can set mode to 'sum', 'min', 'max', or 'alwaysone' (defaults to
'sum'). """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_integer(_FUNC_(tuple_sketch_agg_integer(col1, val1),
tuple_sketch_agg_integer(col2, val2))) FROM VALUES (1, 1, 1, 4), (2, 2, 2, 5),
(3, 3, 4, 6) tab(col1, val1, col2, val2);
+ 2.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleIntersectionInteger(first: Expression, second: Expression,
third: Expression)
+ extends TupleIntersectionBase[IntegerSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(first, second, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression): TupleIntersectionInteger =
+ copy(first = newFirst, second = newSecond, third = newThird)
+
+ override def prettyName: String = "tuple_intersection_integer"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[IntegerSummary] = {
+ val integerMode = mode.toIntegerSummaryMode
+ new IntegerSummarySetOperations(integerMode, integerMode)
+ }
+
+ override protected def intersectSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ intersection: Intersection[IntegerSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch1 = TupleSketchUtils.heapifyIntegerSketch(sketch1Bytes,
prettyName)
+ val tupleSketch2 = TupleSketchUtils.heapifyIntegerSketch(sketch2Bytes,
prettyName)
+
+ intersection.intersect(tupleSketch1)
+ intersection.intersect(tupleSketch2)
+ }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch, thetaSketch, mode) - Intersects the binary
representation of a
+ Datasketches TupleSketch with integer summary data type with the binary
representation of a
+ Datasketches ThetaSketch using a TupleSketch Intersection object. The
ThetaSketch entries are
+ assigned a default integer summary value. Users can set mode to 'sum',
'min', 'max', or 'alwaysone'
+ (defaults to 'sum'). """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_integer(_FUNC_(tuple_sketch_agg_integer(col1, val1),
theta_sketch_agg(col2))) FROM VALUES (1, 1, 1), (2, 2, 2), (3, 3, 4) tab(col1,
val1, col2);
+ 2.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleIntersectionThetaInteger(first: Expression, second:
Expression, third: Expression)
+ extends TupleIntersectionBase[IntegerSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(first, second, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression): TupleIntersectionThetaInteger =
+ copy(first = newFirst, second = newSecond, third = newThird)
+
+ override def prettyName: String = "tuple_intersection_theta_integer"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[IntegerSummary] = {
+ val integerMode = mode.toIntegerSummaryMode
+ new IntegerSummarySetOperations(integerMode, integerMode)
+ }
+
+ override protected def intersectSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ intersection: Intersection[IntegerSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch = TupleSketchUtils.heapifyIntegerSketch(sketch1Bytes,
prettyName)
+ val thetaSketch = ThetaSketchUtils.wrapCompactSketch(sketch2Bytes,
prettyName)
+
+ val defaultSummary = new
IntegerSummaryFactory(mode.toIntegerSummaryMode).newSummary()
+
+ intersection.intersect(tupleSketch)
+ intersection.intersect(thetaSketch, defaultSummary)
+ }
+}
+
+abstract class TupleIntersectionBase[S <: Summary]
+ extends TernaryExpression
+ with CodegenFallback
+ with ExpectsInputTypes {
+
+ override def nullIntolerant: Boolean = true
+
+ override def inputTypes: Seq[AbstractDataType] =
+ Seq(BinaryType, BinaryType, StringTypeWithCollation(supportsTrimCollation
= true))
+
+ override def dataType: DataType = BinaryType
+
+ protected def createSummarySetOperations(mode: TupleSummaryMode):
SummarySetOperations[S]
+
+ protected def intersectSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ intersection: Intersection[S],
+ mode: TupleSummaryMode): Unit
+
+ override def nullSafeEval(sketch1Binary: Any, sketch2Binary: Any, modeInput:
Any): Any = {
+
+ val modeStr = modeInput.asInstanceOf[UTF8String].toString
Review Comment:
If this string is a constant literal, is there any way to detect the failure
at analysis time instead of execution time? Same for other scalar functions
that accept mode string arguments?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -6158,13 +6164,7 @@
"message" : [
"Invalid call to <function>; only valid Theta sketch buffers are
supported as inputs (such as those produced by the `theta_sketch_agg`
function)."
],
- "sqlState" : "22546"
- },
- "THETA_INVALID_LG_NOM_ENTRIES" : {
Review Comment:
Will renaming this error class break any error-handling code? or no?
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TupleSketchUtilsSuite.scala:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.datasketches.tuple.UpdatableSketchBuilder
+import org.apache.datasketches.tuple.adouble.{DoubleSummary,
DoubleSummaryFactory}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary,
IntegerSummaryFactory}
+
+import org.apache.spark.{SparkFunSuite, SparkRuntimeException}
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+
+class TupleSketchUtilsSuite extends SparkFunSuite with SQLHelper {
+
+ test("TupleSummaryMode.fromString: accepts valid modes") {
+ val validModes = Seq("sum", "min", "max", "alwaysone")
+ validModes.foreach { mode =>
+ // Should not throw any exception
+ val result = TupleSummaryMode.fromString(mode, "test_function")
+ assert(result != null)
+ assert(result.toString == mode)
+ }
+ }
+
+ test("TupleSummaryMode.fromString: case insensitive") {
+ assert(TupleSummaryMode.fromString("SUM", "test_function") ==
TupleSummaryMode.Sum)
+ assert(TupleSummaryMode.fromString("Min", "test_function") ==
TupleSummaryMode.Min)
+ assert(TupleSummaryMode.fromString("MAX", "test_function") ==
TupleSummaryMode.Max)
+ assert(TupleSummaryMode.fromString("AlwaysOne", "test_function") ==
TupleSummaryMode.AlwaysOne)
+ }
+
+ test("TupleSummaryMode.fromString: throws exception for invalid modes") {
+ val invalidModes = Seq("invalid", "average", "count", "multiply", "")
+ invalidModes.foreach { mode =>
+ checkError(
+ exception = intercept[SparkRuntimeException] {
+ TupleSummaryMode.fromString(mode, "test_function")
+ },
+ condition = "TUPLE_INVALID_SKETCH_MODE",
+ parameters = Map(
+ "function" -> "`test_function`",
+ "mode" -> mode,
+ "validModes" -> TupleSummaryMode.validModeStrings.mkString(", ")))
+ }
+ }
+
+ test("TupleSummaryMode: converts to DoubleSummary.Mode correctly") {
+ assert(TupleSummaryMode.Sum.toDoubleSummaryMode == DoubleSummary.Mode.Sum)
+ assert(TupleSummaryMode.Min.toDoubleSummaryMode == DoubleSummary.Mode.Min)
+ assert(TupleSummaryMode.Max.toDoubleSummaryMode == DoubleSummary.Mode.Max)
+ assert(TupleSummaryMode.AlwaysOne.toDoubleSummaryMode ==
DoubleSummary.Mode.AlwaysOne)
+ }
+
+ test("TupleSummaryMode: converts to IntegerSummary.Mode correctly") {
+ assert(TupleSummaryMode.Sum.toIntegerSummaryMode ==
IntegerSummary.Mode.Sum)
+ assert(TupleSummaryMode.Min.toIntegerSummaryMode ==
IntegerSummary.Mode.Min)
+ assert(TupleSummaryMode.Max.toIntegerSummaryMode ==
IntegerSummary.Mode.Max)
+ assert(TupleSummaryMode.AlwaysOne.toIntegerSummaryMode ==
IntegerSummary.Mode.AlwaysOne)
+ }
+
+ test("heapifyDoubleSketch: successfully deserializes valid tuple sketch
bytes") {
+ // Create a valid tuple sketch and get its bytes
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 1.0)
+ updateSketch.update("test2", 2.0)
+ updateSketch.update("test3", 3.0)
+
+ val compactSketch = updateSketch.compact()
+ val validBytes = compactSketch.toByteArray
+
+ // Test that heapifyDoubleSketch can successfully deserialize the valid
bytes
+ val heapifiedSketch = TupleSketchUtils.heapifyDoubleSketch(validBytes,
"test_function")
+
+ assert(heapifiedSketch != null)
+ assert(heapifiedSketch.getEstimate == compactSketch.getEstimate)
+ assert(heapifiedSketch.getRetainedEntries ==
compactSketch.getRetainedEntries)
+ }
+
+ test("heapifyDoubleSketch: throws exception for invalid bytes") {
+ val invalidBytes = Array[Byte](1, 2, 3, 4, 5)
+ checkError(
+ exception = intercept[SparkRuntimeException] {
+ TupleSketchUtils.heapifyDoubleSketch(invalidBytes, "test_function")
+ },
+ condition = "TUPLE_INVALID_INPUT_SKETCH_BUFFER",
+ parameters = Map("function" -> "`test_function`"))
+ }
+
+ test("heapifyIntegerSketch: successfully deserializes valid tuple sketch
bytes") {
+ // Create a valid integer tuple sketch and get its bytes
+ val summaryFactory = new IntegerSummaryFactory(IntegerSummary.Mode.Sum)
+ val updateSketch =
+ new UpdatableSketchBuilder[java.lang.Integer,
IntegerSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 1)
+ updateSketch.update("test2", 2)
+ updateSketch.update("test3", 3)
+
+ val compactSketch = updateSketch.compact()
+ val validBytes = compactSketch.toByteArray
+
+ // Test that heapifyIntegerSketch can successfully deserialize the valid
bytes
+ val heapifiedSketch = TupleSketchUtils.heapifyIntegerSketch(validBytes,
"test_function")
+
+ assert(heapifiedSketch != null)
+ assert(heapifiedSketch.getEstimate == compactSketch.getEstimate)
+ assert(heapifiedSketch.getRetainedEntries ==
compactSketch.getRetainedEntries)
+ }
+
+ test("heapifyIntegerSketch: throws exception for invalid bytes") {
+ val invalidBytes = Array[Byte](1, 2, 3, 4, 5)
+ checkError(
+ exception = intercept[SparkRuntimeException] {
+ TupleSketchUtils.heapifyIntegerSketch(invalidBytes, "test_function")
+ },
+ condition = "TUPLE_INVALID_INPUT_SKETCH_BUFFER",
+ parameters = Map("function" -> "`test_function`"))
+ }
+
+ test("aggregateNumericSummaries: sum mode aggregates correctly for Double") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 1.0)
+ updateSketch.update("test2", 2.0)
+ updateSketch.update("test3", 3.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Sum,
+ it => it.getSummary.getValue)
+
+ assert(result == 6.0)
+ }
+
+ test("aggregateNumericSummaries: min mode finds minimum for Double") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 5.0)
+ updateSketch.update("test2", 2.0)
+ updateSketch.update("test3", 8.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Min,
+ it => it.getSummary.getValue)
+
+ assert(result == 2.0)
+ }
+
+ test("aggregateNumericSummaries: max mode finds maximum for Double") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 5.0)
+ updateSketch.update("test2", 2.0)
+ updateSketch.update("test3", 8.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Max,
+ it => it.getSummary.getValue)
+
+ assert(result == 8.0)
+ }
+
+ test("aggregateNumericSummaries: alwaysone mode counts entries for Double") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 5.0)
+ updateSketch.update("test2", 2.0)
+ updateSketch.update("test3", 8.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.AlwaysOne,
+ it => it.getSummary.getValue)
+
+ assert(result == 3.0)
+ }
+
+ test("aggregateNumericSummaries: sum mode aggregates correctly for Long") {
+ val summaryFactory = new IntegerSummaryFactory(IntegerSummary.Mode.Sum)
+ val updateSketch =
+ new UpdatableSketchBuilder[java.lang.Integer,
IntegerSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 10)
+ updateSketch.update("test2", 20)
+ updateSketch.update("test3", 30)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[IntegerSummary,
Long](
+ compactSketch.iterator(),
+ TupleSummaryMode.Sum,
+ it => it.getSummary.getValue.toLong)
+
+ assert(result == 60L)
+ }
+
+ test("aggregateNumericSummaries: min mode finds minimum for Long") {
+ val summaryFactory = new IntegerSummaryFactory(IntegerSummary.Mode.Sum)
+ val updateSketch =
+ new UpdatableSketchBuilder[java.lang.Integer,
IntegerSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 50)
+ updateSketch.update("test2", 20)
+ updateSketch.update("test3", 80)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[IntegerSummary,
Long](
+ compactSketch.iterator(),
+ TupleSummaryMode.Min,
+ it => it.getSummary.getValue.toLong)
+
+ assert(result == 20L)
+ }
+
+ test("aggregateNumericSummaries: max mode finds maximum for Long") {
+ val summaryFactory = new IntegerSummaryFactory(IntegerSummary.Mode.Sum)
+ val updateSketch =
+ new UpdatableSketchBuilder[java.lang.Integer,
IntegerSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 50)
+ updateSketch.update("test2", 20)
+ updateSketch.update("test3", 80)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[IntegerSummary,
Long](
+ compactSketch.iterator(),
+ TupleSummaryMode.Max,
+ it => it.getSummary.getValue.toLong)
+
+ assert(result == 80L)
+ }
+
+ test("aggregateNumericSummaries: alwaysone mode counts entries for Long") {
+ val summaryFactory = new IntegerSummaryFactory(IntegerSummary.Mode.Sum)
+ val updateSketch =
+ new UpdatableSketchBuilder[java.lang.Integer,
IntegerSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 50)
+ updateSketch.update("test2", 20)
+ updateSketch.update("test3", 80)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[IntegerSummary,
Long](
+ compactSketch.iterator(),
+ TupleSummaryMode.AlwaysOne,
+ it => it.getSummary.getValue.toLong)
+
+ assert(result == 3L)
+ }
+
+ test("aggregateNumericSummaries: empty sketch returns zero for sum mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Sum,
+ it => it.getSummary.getValue)
+
+ assert(result == 0.0)
+ }
+
+ test("aggregateNumericSummaries: empty sketch returns zero for min mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Min,
+ it => it.getSummary.getValue)
+
+ assert(result == 0.0)
+ }
+
+ test("aggregateNumericSummaries: empty sketch returns zero for max mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Max,
+ it => it.getSummary.getValue)
+
+ assert(result == 0.0)
+ }
+
+ test("aggregateNumericSummaries: empty sketch returns zero for alwaysone
mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.AlwaysOne,
+ it => it.getSummary.getValue)
+
+ assert(result == 0.0)
+ }
+
+ test("aggregateNumericSummaries: single entry sketch") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 42.0)
+
+ val compactSketch = updateSketch.compact()
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Sum, it =>
it.getSummary.getValue) == 42.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Min, it =>
it.getSummary.getValue) == 42.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Max, it =>
it.getSummary.getValue) == 42.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.AlwaysOne, it =>
it.getSummary.getValue) == 1.0)
+ }
+
+ test("aggregateNumericSummaries: negative values for sum mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", -5.0)
+ updateSketch.update("test2", -2.0)
+ updateSketch.update("test3", -8.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Sum,
+ it => it.getSummary.getValue)
+
+ assert(result == -15.0)
+ }
+
+ test("aggregateNumericSummaries: negative values for min mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", -5.0)
+ updateSketch.update("test2", -2.0)
+ updateSketch.update("test3", -8.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Min,
+ it => it.getSummary.getValue)
+
+ assert(result == -8.0)
+ }
+
+ test("aggregateNumericSummaries: negative values for max mode") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", -5.0)
+ updateSketch.update("test2", -2.0)
+ updateSketch.update("test3", -8.0)
+
+ val compactSketch = updateSketch.compact()
+ val result = TupleSketchUtils.aggregateNumericSummaries[DoubleSummary,
Double](
+ compactSketch.iterator(),
+ TupleSummaryMode.Max,
+ it => it.getSummary.getValue)
+
+ assert(result == -2.0)
+ }
+
+ test("aggregateNumericSummaries: mixed positive and negative values") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", -5.0)
+ updateSketch.update("test2", 10.0)
+ updateSketch.update("test3", -3.0)
+ updateSketch.update("test4", 7.0)
+
+ val compactSketch = updateSketch.compact()
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Sum, it =>
it.getSummary.getValue) == 9.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Min, it =>
it.getSummary.getValue) == -5.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Max, it =>
it.getSummary.getValue) == 10.0)
+ }
+
+ test("aggregateNumericSummaries: zero values") {
+ val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+ val updateSketch = new UpdatableSketchBuilder[java.lang.Double,
DoubleSummary](summaryFactory)
+ .build()
+
+ updateSketch.update("test1", 0.0)
+ updateSketch.update("test2", 0.0)
+ updateSketch.update("test3", 0.0)
+
+ val compactSketch = updateSketch.compact()
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Sum, it =>
it.getSummary.getValue) == 0.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Min, it =>
it.getSummary.getValue) == 0.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.Max, it =>
it.getSummary.getValue) == 0.0)
+
+ assert(TupleSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+ compactSketch.iterator(), TupleSummaryMode.AlwaysOne, it =>
it.getSummary.getValue) == 3.0)
Review Comment:
Will we have any floating point precision issues with these tests, or are
all the values cleanly represented in IEEE floating-point numbers?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/tupleUnion.scala:
##########
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.tuple.{Summary, SummarySetOperations, Union}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary,
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary,
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature,
InputParameter}
+import org.apache.spark.sql.catalyst.util.{ThetaSketchUtils, TupleSketchUtils,
TupleSummaryMode}
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType,
IntegerType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The TupleUnionDouble function merges two binary representations of
Datasketches TupleSketch
+ * objects with double summary data type using a TupleSketch Union object.
When duplicate keys
+ * appear across the two sketches, their summary values are combined according
to the specified
+ * mode (sum, min, max, or alwaysone).
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
Summary value types
+ * must be consistent across all sketches being merged.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param first
+ * first TupleSketch binary representation with double summaries
+ * @param second
+ * second TupleSketch binary representation with double summaries
+ * @param third
+ * lgNomEntries - the log-base-2 of nominal entries that determines the
result sketch size
+ * @param fourth
+ * mode - the aggregation mode for combining duplicate key summaries (sum,
min, max, alwaysone)
+ */
+case class TupleUnionDouble(
+ first: Expression,
+ second: Expression,
+ third: Expression,
+ fourth: Expression)
+ extends TupleUnionBase[DoubleSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(
+ first,
+ second,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ def this(first: Expression, second: Expression, third: Expression) = {
+ this(first, second, third, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleUnionDouble =
+ copy(first = newFirst, second = newSecond, third = newThird, fourth =
newFourth)
+
+ override def prettyName: String = "tuple_union_double"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[DoubleSummary] =
+ new DoubleSummarySetOperations(mode.toDoubleSummaryMode)
+
+ override protected def unionSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ union: Union[DoubleSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch1 = TupleSketchUtils.heapifyDoubleSketch(sketch1Bytes,
prettyName)
+ val tupleSketch2 = TupleSketchUtils.heapifyDoubleSketch(sketch2Bytes,
prettyName)
+
+ union.union(tupleSketch1)
+ union.union(tupleSketch2)
+ }
+}
+
+/**
+ * The TupleUnionThetaDouble function merges a binary representation of a
Datasketches TupleSketch
+ * object with double summary data type with a binary representation of a
Datasketches ThetaSketch
+ * using a TupleSketch Union object. The ThetaSketch entries are assigned a
default double summary
+ * value based on the specified mode. When duplicate keys appear across the
two sketches, their
+ * summary values are combined according to the specified mode (sum, min, max,
or alwaysone).
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
Summary value types
+ * must be consistent across all sketches being merged.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param first
+ * TupleSketch binary representation with double summaries
+ * @param second
+ * ThetaSketch binary representation
+ * @param third
+ * lgNomEntries - the log-base-2 of nominal entries that determines the
result sketch size
+ * @param fourth
+ * mode - the aggregation mode for combining duplicate key summaries (sum,
min, max, alwaysone)
+ */
+case class TupleUnionThetaDouble(
+ first: Expression,
+ second: Expression,
+ third: Expression,
+ fourth: Expression)
+ extends TupleUnionBase[DoubleSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(
+ first,
+ second,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ def this(first: Expression, second: Expression, third: Expression) = {
+ this(first, second, third, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleUnionThetaDouble =
+ copy(first = newFirst, second = newSecond, third = newThird, fourth =
newFourth)
+
+ override def prettyName: String = "tuple_union_theta_double"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[DoubleSummary] =
+ new DoubleSummarySetOperations(mode.toDoubleSummaryMode)
+
+ override protected def unionSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ union: Union[DoubleSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch = TupleSketchUtils.heapifyDoubleSketch(sketch1Bytes,
prettyName)
+ val thetaSketch = ThetaSketchUtils.wrapCompactSketch(sketch2Bytes,
prettyName)
+
+ val defaultSummary = new
DoubleSummaryFactory(mode.toDoubleSummaryMode).newSummary()
+
+ union.union(tupleSketch)
+ union.union(thetaSketch, defaultSummary)
+ }
+}
+
+/**
+ * The TupleUnionInteger function merges two binary representations of
Datasketches TupleSketch
+ * objects with integer summary data type using a TupleSketch Union object.
When duplicate keys
+ * appear across the two sketches, their summary values are combined according
to the specified
+ * mode (sum, min, max, or alwaysone).
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
Summary value types
+ * must be consistent across all sketches being merged.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param first
+ * first TupleSketch binary representation with integer summaries
+ * @param second
+ * second TupleSketch binary representation with integer summaries
+ * @param third
+ * lgNomEntries - the log-base-2 of nominal entries that determines the
result sketch size
+ * @param fourth
+ * mode - the aggregation mode for combining duplicate key summaries (sum,
min, max, alwaysone)
+ */
+case class TupleUnionInteger(
+ first: Expression,
+ second: Expression,
+ third: Expression,
+ fourth: Expression)
+ extends TupleUnionBase[IntegerSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(
+ first,
+ second,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ def this(first: Expression, second: Expression, third: Expression) = {
+ this(first, second, third, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleUnionInteger =
+ copy(first = newFirst, second = newSecond, third = newThird, fourth =
newFourth)
+
+ override def prettyName: String = "tuple_union_integer"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[IntegerSummary] = {
+ val integerMode = mode.toIntegerSummaryMode
+ new IntegerSummarySetOperations(integerMode, integerMode)
+ }
+
+ override protected def unionSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ union: Union[IntegerSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch1 = TupleSketchUtils.heapifyIntegerSketch(sketch1Bytes,
prettyName)
+ val tupleSketch2 = TupleSketchUtils.heapifyIntegerSketch(sketch2Bytes,
prettyName)
+
+ union.union(tupleSketch1)
+ union.union(tupleSketch2)
+ }
+}
+
+/**
+ * The TupleUnionThetaInteger function merges a binary representation of a
Datasketches TupleSketch
+ * object with integer summary data type with a binary representation of a
Datasketches ThetaSketch
+ * using a TupleSketch Union object. The ThetaSketch entries are assigned a
default integer summary
+ * value based on the specified mode. When duplicate keys appear across the
two sketches, their
+ * summary values are combined according to the specified mode (sum, min, max,
or alwaysone).
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
Summary value types
+ * must be consistent across all sketches being merged.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param first
+ * TupleSketch binary representation with integer summaries
+ * @param second
+ * ThetaSketch binary representation
+ * @param third
+ * lgNomEntries - the log-base-2 of nominal entries that determines the
result sketch size
+ * @param fourth
+ * mode - the aggregation mode for combining duplicate key summaries (sum,
min, max, alwaysone)
+ */
+case class TupleUnionThetaInteger(
+ first: Expression,
+ second: Expression,
+ third: Expression,
+ fourth: Expression)
+ extends TupleUnionBase[IntegerSummary] {
+
+ def this(first: Expression, second: Expression) = {
+ this(
+ first,
+ second,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ def this(first: Expression, second: Expression, third: Expression) = {
+ this(first, second, third, Literal(TupleSummaryMode.Sum.toString))
+ }
+
+ override def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleUnionThetaInteger =
+ copy(first = newFirst, second = newSecond, third = newThird, fourth =
newFourth)
+
+ override def prettyName: String = "tuple_union_theta_integer"
+
+ override protected def createSummarySetOperations(
+ mode: TupleSummaryMode): SummarySetOperations[IntegerSummary] = {
+ val integerMode = mode.toIntegerSummaryMode
+ new IntegerSummarySetOperations(integerMode, integerMode)
+ }
+
+ override protected def unionSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ union: Union[IntegerSummary],
+ mode: TupleSummaryMode): Unit = {
+ val tupleSketch = TupleSketchUtils.heapifyIntegerSketch(sketch1Bytes,
prettyName)
+ val thetaSketch = ThetaSketchUtils.wrapCompactSketch(sketch2Bytes,
prettyName)
+
+ val defaultSummary = new
IntegerSummaryFactory(mode.toIntegerSummaryMode).newSummary()
+
+ union.union(tupleSketch)
+ union.union(thetaSketch, defaultSummary)
+ }
+}
+
+abstract class TupleUnionBase[S <: Summary]
+ extends QuaternaryExpression
+ with CodegenFallback
+ with ExpectsInputTypes {
+
+ override def nullIntolerant: Boolean = true
+
+ override def inputTypes: Seq[AbstractDataType] =
+ Seq(
+ BinaryType,
+ BinaryType,
+ IntegerType,
+ StringTypeWithCollation(supportsTrimCollation = true))
+
+ override def dataType: DataType = BinaryType
+
+ protected def createSummarySetOperations(mode: TupleSummaryMode):
SummarySetOperations[S]
+
+ protected def unionSketches(
+ sketch1Bytes: Array[Byte],
+ sketch2Bytes: Array[Byte],
+ union: Union[S],
+ mode: TupleSummaryMode): Unit
+
+ override def nullSafeEval(
+ sketch1Binary: Any,
+ sketch2Binary: Any,
+ lgNomEntries: Any,
+ modeInput: Any): Any = {
+
+ val logNominalEntries = lgNomEntries.asInstanceOf[Int]
+ ThetaSketchUtils.checkLgNomLongs(logNominalEntries, prettyName)
+
+ val modeStr = modeInput.asInstanceOf[UTF8String].toString
+ val tupleSummaryMode = TupleSummaryMode.fromString(modeStr, prettyName)
+
+ val sketch1Bytes = sketch1Binary.asInstanceOf[Array[Byte]]
+ val sketch2Bytes = sketch2Binary.asInstanceOf[Array[Byte]]
+
+ val nominalEntries = 1 << logNominalEntries
+ val summarySetOps = createSummarySetOperations(tupleSummaryMode)
+ val union = new Union(nominalEntries, summarySetOps)
+
+ unionSketches(sketch1Bytes, sketch2Bytes, union, tupleSummaryMode)
+
+ union.getResult.toByteArray
+ }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch1, tupleSketch2, lgNomEntries, mode) - Merges two binary
representations of Datasketches
+ TupleSketch objects with double summary data type using a TupleSketch
Union object. Users can
+ set lgNomEntries to a value between 4 and 26 (defaults to 12) and mode to
'sum', 'min', 'max',
+ or 'alwaysone' (defaults to 'sum'). """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_double(_FUNC_(tuple_sketch_agg_double(col1, val1),
tuple_sketch_agg_double(col2, val2))) FROM VALUES (1, 1.0D, 4, 4.0D), (2, 2.0D,
5, 5.0D), (3, 3.0D, 6, 6.0D) tab(col1, val1, col2, val2);
+ 6.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+object TupleUnionDoubleExpressionBuilder extends ExpressionBuilder {
+ final val defaultFunctionSignature = FunctionSignature(Seq(
+ InputParameter("first"),
+ InputParameter("second"),
+ InputParameter("lgNomEntries",
Some(Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS))),
+ InputParameter("mode", Some(Literal(TupleSummaryMode.Sum.toString)))
+ ))
+ override def functionSignature: Option[FunctionSignature] =
Some(defaultFunctionSignature)
+ override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
+ // The rearrange method ensures expressions.size == 4 with defaults filled
in
+ assert(expressions.size == 4)
+ new TupleUnionDouble(expressions(0), expressions(1), expressions(2),
expressions(3))
+ }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch, thetaSketch, lgNomEntries, mode) - Merges the binary
representation of a
+ Datasketches TupleSketch with double summary data type with the binary
representation of a
+ Datasketches ThetaSketch using a TupleSketch Union object. The ThetaSketch
entries are assigned
+ a default double summary value. Users can set lgNomEntries to a value
between 4 and 26 (defaults to 12),
+ and mode to 'sum', 'min', 'max', or 'alwaysone' (defaults to 'sum'). """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_double(_FUNC_(tuple_sketch_agg_double(col1, val1),
theta_sketch_agg(col2))) FROM VALUES (1, 1.0D, 4), (2, 2.0D, 5), (3, 3.0D, 6)
tab(col1, val1, col2);
+ 6.0
+ """,
+ group = "misc_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+object TupleUnionThetaDoubleExpressionBuilder extends ExpressionBuilder {
Review Comment:
One last major refactoring request for this PR. Would you mind to split off
all the functions that deal with Theta sketches from this PR, and this PR can
continue to focus just on tuple sketches? The amount of new code is large. It
is split into files and tested well now, but we could improve focus but moving
that part into a separate step. The remaining new functions can still work well
on their own as a group by supporting creating tuple sketches and computing
merges and estimates on them. Then we can create and review the extra
functionality supporting tuple + theta sketches in a separate independent PR.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/tupleSketchAgg.scala:
##########
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.tuple.{Sketch, SummaryFactory,
SummarySetOperations, Union, UpdatableSketchBuilder, UpdatableSummary}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary,
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary,
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder,
TypeCheckResult}
+import org.apache.spark.sql.catalyst.expressions.{Expression,
ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature,
InputParameter}
+import org.apache.spark.sql.catalyst.trees.QuaternaryLike
+import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory,
SketchSize, SummaryAggregateMode, ThetaSketchUtils, TupleSketchUtils,
TupleSummaryMode}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, ArrayType, BinaryType,
DataType, DoubleType, FloatType, IntegerType, LongType, StringType,
TypeCollection}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The TupleSketchAggDouble function utilizes a Datasketches TupleSketch
instance to count a
+ * probabilistic approximation of the number of unique values in a given
column with associated
+ * double type summary values that can be aggregated using different modes
(sum, min, max,
+ * alwaysone), and outputs the binary representation of the TupleSketch.
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
However, summary
+ * value types must be consistent across all calls; mixing types can produce
incorrect results or
+ * precision loss. The value type suffix in the function name (e.g., _double)
ensures type safety.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param key
+ * key expression against which unique counting will occur
+ * @param summary
+ * summary expression (double type) against which different mode
aggregations will occur
+ * @param lgNomEntries
+ * the log-base-2 of nomEntries decides the number of buckets for the sketch
+ * @param mode
+ * the aggregation mode for numeric summaries (sum, min, max, alwaysone)
+ * @param mutableAggBufferOffset
+ * offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ * offset for input aggregation buffer
+ */
+case class TupleSketchAggDouble(
+ key: Expression,
+ summary: Expression,
+ lgNomEntries: Expression,
+ mode: Expression,
+ override val mutableAggBufferOffset: Int,
+ override val inputAggBufferOffset: Int)
+ extends TupleSketchAggBase[java.lang.Double, DoubleSummary]
+ with QuaternaryLike[Expression] {
+
+ // Constructors
+ def this(key: Expression, summary: Expression) = {
+ this(
+ key,
+ summary,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString),
+ 0,
+ 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression) = {
+ this(key, summary, lgNomEntries, Literal(TupleSummaryMode.Sum.toString),
0, 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression,
mode: Expression) = {
+ this(key, summary, lgNomEntries, mode, 0, 0)
+ }
+
+ // Copy constructors required by ImperativeAggregate
+ override def withNewMutableAggBufferOffset(
+ newMutableAggBufferOffset: Int): TupleSketchAggDouble =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
TupleSketchAggDouble =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override protected def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleSketchAggDouble =
+ copy(key = newFirst, summary = newSecond, lgNomEntries = newThird, mode =
newFourth)
+
+ override def first: Expression = key
+ override def second: Expression = summary
+ override def third: Expression = lgNomEntries
+ override def fourth: Expression = mode
+
+ // Override for TypedImperativeAggregate
+ override def prettyName: String = "tuple_sketch_agg_double"
+
+ /** Specifies accepted summary input types (double). */
+ override protected def summaryInputType: AbstractDataType = DoubleType
+
+ /**
+ * Creates a DoubleSummaryFactory with the configured aggregation mode.
+ */
+ override protected def createSummaryFactory(): SummaryFactory[DoubleSummary]
= {
+ new DoubleSummaryFactory(modeEnum.toDoubleSummaryMode)
+ }
+
+ /**
+ * Creates DoubleSummarySetOperations for merge operations with the
configured mode.
+ */
+ override protected def createSummarySetOperations():
SummarySetOperations[DoubleSummary] = {
+ new DoubleSummarySetOperations(modeEnum.toDoubleSummaryMode)
+ }
+
+ /**
+ * Heapify a sketch from a byte array.
+ *
+ * @param buffer
+ * the serialized sketch byte array
+ * @return
+ * a Sketch[DoubleSummary] instance
+ */
+ override protected def heapifySketch(buffer: Array[Byte]):
Sketch[DoubleSummary] = {
+ TupleSketchUtils.heapifyDoubleSketch(buffer, prettyName)
+ }
+}
+
+/**
+ * The TupleSketchAggInteger function utilizes a Datasketches TupleSketch
instance to count a
+ * probabilistic approximation of the number of unique values in a given
column with associated
+ * integer type summary values that can be aggregated using different modes
(sum, min, max,
+ * alwaysone), and outputs the binary representation of the TupleSketch.
+ *
+ * Keys are hashed internally based on their type and value - the same logical
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys.
However, summary
+ * value types must be consistent across all calls; mixing types can produce
incorrect results or
+ * precision loss. The value type suffix in the function name (e.g., _integer)
ensures type safety.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for
more information.
+ *
+ * @param key
+ * key expression against which unique counting will occur
+ * @param summary
+ * summary expression (integer type) against which different mode
aggregations will occur
+ * @param lgNomEntries
+ * the log-base-2 of nomEntries decides the number of buckets for the sketch
+ * @param mode
+ * the aggregation mode for numeric summaries (sum, min, max, alwaysone)
+ * @param mutableAggBufferOffset
+ * offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ * offset for input aggregation buffer
+ */
+case class TupleSketchAggInteger(
+ key: Expression,
+ summary: Expression,
+ lgNomEntries: Expression,
+ mode: Expression,
+ override val mutableAggBufferOffset: Int,
+ override val inputAggBufferOffset: Int)
+ extends TupleSketchAggBase[Integer, IntegerSummary]
+ with QuaternaryLike[Expression] {
+
+ // Constructors
+ def this(key: Expression, summary: Expression) = {
+ this(
+ key,
+ summary,
+ Literal(ThetaSketchUtils.DEFAULT_LG_NOM_LONGS),
+ Literal(TupleSummaryMode.Sum.toString),
+ 0,
+ 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression) = {
+ this(key, summary, lgNomEntries, Literal(TupleSummaryMode.Sum.toString),
0, 0)
+ }
+
+ def this(key: Expression, summary: Expression, lgNomEntries: Expression,
mode: Expression) = {
+ this(key, summary, lgNomEntries, mode, 0, 0)
+ }
+
+ // Copy constructors required by ImperativeAggregate
+ override def withNewMutableAggBufferOffset(
+ newMutableAggBufferOffset: Int): TupleSketchAggInteger =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
TupleSketchAggInteger =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override protected def withNewChildrenInternal(
+ newFirst: Expression,
+ newSecond: Expression,
+ newThird: Expression,
+ newFourth: Expression): TupleSketchAggInteger =
+ copy(key = newFirst, summary = newSecond, lgNomEntries = newThird, mode =
newFourth)
+
+ override def first: Expression = key
+ override def second: Expression = summary
+ override def third: Expression = lgNomEntries
+ override def fourth: Expression = mode
+
+ // Override for TypedImperativeAggregate
+ override def prettyName: String = "tuple_sketch_agg_integer"
+
+ /** Specifies accepted summary input types (integer). */
+ override protected def summaryInputType: AbstractDataType =
+ IntegerType
Review Comment:
Is it possible for the integers to overflow past the max value of an
IntegerType? Do you think we need any overflow testing and/or management for
this?
##########
sql/core/src/test/resources/sql-tests/inputs/tuplesketch.sql:
##########
@@ -0,0 +1,991 @@
+-- Positive test cases
+-- Create tables with key-value pairs for tuple sketches
+
+-- Integer key with double values
+DROP TABLE IF EXISTS t_int_double_1_5_through_7_11;
+CREATE TABLE t_int_double_1_5_through_7_11 AS
+VALUES
+ (1, 1.0D, 5, 5.0D), (2, 2.0D, 6, 6.0D), (3, 3.0D, 7, 7.0D),
+ (4, 4.0D, 8, 8.0D), (5, 5.0D, 9, 9.0D), (6, 6.0D, 10, 10.0D),
+ (7, 7.0D, 11, 11.0D) AS tab(key1, val1, key2, val2);
+
+-- Long key with double values
+DROP TABLE IF EXISTS t_long_double_1_5_through_7_11;
+CREATE TABLE t_long_double_1_5_through_7_11 AS
+VALUES
+ (1L, 1.0D, 5L, 5.00D), (2L, 2.00D, 6L, 6.00D), (3L, 3.00D, 7L, 7.00D),
+ (4L, 4.0D, 8L, 8.00D), (5L, 5.00D, 9L, 9.00D), (6L, 6.00D, 10L, 10.00D),
+ (7L, 7.0D, 11L, 11.00D) AS tab(key1, val1, key2, val2);
+
+-- Double key with double values
+DROP TABLE IF EXISTS t_double_double_1_1_1_4_through_1_5_1_8;
+CREATE TABLE t_double_double_1_1_1_4_through_1_5_1_8 AS
+SELECT CAST(key1 AS DOUBLE) AS key1, CAST(val1 AS DOUBLE) AS val1,
+ CAST(key2 AS DOUBLE) AS key2, CAST(val2 AS DOUBLE) AS val2
+FROM VALUES
+ (1.1, 1.0, 1.4, 4.0), (1.2, 2.0, 1.5, 5.0), (1.3, 3.0, 1.6, 6.0),
+ (1.4, 4.0, 1.7, 7.0), (1.5, 5.0, 1.8, 8.0) AS tab(key1, val1, key2, val2);
+
+-- Float key with double values
+DROP TABLE IF EXISTS t_float_double_1_1_1_4_through_1_5_1_8;
+CREATE TABLE t_float_double_1_1_1_4_through_1_5_1_8 AS
+SELECT CAST(key1 AS FLOAT) key1, CAST(val1 AS DOUBLE) AS val1,
+ CAST(key2 AS FLOAT) key2, CAST(val2 AS DOUBLE) AS val2
+FROM VALUES
+ (1.1, 1.0, 1.4, 4.0), (1.2, 2.0, 1.5, 5.0), (1.3, 3.0, 1.6, 6.0),
+ (1.4, 4.0, 1.7, 7.0), (1.5, 5.0, 1.8, 8.0) AS tab(key1, val1, key2, val2);
+
+-- String key with double values
+DROP TABLE IF EXISTS t_string_double_a_d_through_e_h;
+CREATE TABLE t_string_double_a_d_through_e_h AS
+VALUES
+ ('a', 1.00D, 'd', 4.00D), ('b', 2.00D, 'e', 5.00D), ('c', 3.00D, 'f', 6.00D),
+ ('d', 4.00D, 'g', 7.00D), ('e', 5.00D, 'h', 8.00D) AS tab(key1, val1, key2,
val2);
+
+-- Binary key with double values
+DROP TABLE IF EXISTS t_binary_double_a_b_through_e_f;
+CREATE TABLE t_binary_double_a_b_through_e_f AS
+VALUES
+ (X'A', 1.00D, X'B', 2.00D), (X'B', 2.00D, X'C', 3.00D), (X'C', 3.00D, X'D',
4.00D),
+ (X'D', 4.00D, X'E', 5.00D), (X'E', 5.00D, X'F', 6.00D) AS tab(key1, val1,
key2, val2);
+
+-- Array Integer key with double values
+DROP TABLE IF EXISTS t_array_int_double_1_3_through_4_6;
+CREATE TABLE t_array_int_double_1_3_through_4_6 AS
+VALUES
+ (ARRAY(1), 1.00D, ARRAY(3), 3.00D),
+ (ARRAY(2), 2.00D, ARRAY(4), 4.00D),
+ (ARRAY(3), 3.00D, ARRAY(5), 5.00D),
+ (ARRAY(4), 4.00D, ARRAY(6), 6.00D) AS tab(key1, val1, key2, val2);
+
+-- Array Long key with double values
+DROP TABLE IF EXISTS t_array_long_double_1_3_through_4_6;
+CREATE TABLE t_array_long_double_1_3_through_4_6 AS
+VALUES
+ (ARRAY(1L), 1.00D, ARRAY(3L), 3.00D),
+ (ARRAY(2L), 2.00D, ARRAY(4L), 4.00D),
+ (ARRAY(3L), 3.00D, ARRAY(5L), 5.00D),
+ (ARRAY(4L), 4.00D, ARRAY(6L), 6.00D) AS tab(key1, val1, key2, val2);
+
+-- Integer key with integer values
+DROP TABLE IF EXISTS t_int_int_1_5_through_7_11;
+CREATE TABLE t_int_int_1_5_through_7_11 AS
+VALUES
+ (1, 1, 5, 5), (2, 2, 6, 6), (3, 3, 7, 7),
+ (4, 4, 8, 8), (5, 5, 9, 9), (6, 6, 10, 10),
+ (7, 7, 11, 11) AS tab(key1, val1, key2, val2);
+
+DROP TABLE IF EXISTS t_string_collation;
+CREATE TABLE t_string_collation AS
+VALUES
+ ('', 1.00D), (' ', 2.00D), (CAST(X'C1' AS STRING), 3.00D), (CAST(X'80' AS
STRING), 4.00D),
+ ('\uFFFD', 5.00D), ('Å', 6.00D), ('å', 7.00D), ('a\u030A', 8.00D), ('Å ',
9.00D), ('å ', 10.00D),
+ ('a\u030A ', 11.00D) AS tab(key1, val1);
+
+-- Test basic tuple_sketch_agg_double with IntegerType key and double summary
from table
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1)) AS
result
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_integer with IntegerType key and integer summary from
table
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key1, val1, 12))
AS result
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with ArrayType(IntegerType) key and double
summary from table
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1))
+FROM t_array_int_double_1_3_through_4_6;
+
+-- Test tuple_sketch_agg_double with ArrayType(LongType) key and double
summary from table
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key2, val2))
+FROM t_array_long_double_1_3_through_4_6;
+
+-- Test tuple_sketch_agg_double with BinaryType key and double summary from
table
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1))
+FROM t_binary_double_a_b_through_e_f;
+
+-- Test tuple_sketch_agg_double with DoubleType key and double summary from
table
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_sketch_agg_double with FloatType key and double summary from
table
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key2, val2))
+FROM t_float_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_sketch_agg_double with IntegerType key and explicit lgNomEntries
parameter
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1, 22))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with LongType key and double summary
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with StringType key and double summary
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_sketch_agg_double with explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1, 12))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_integer with explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key1, val1, 12))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with all parameters including mode - sum
(default)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1, 12,
'sum'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with mode - min
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1, 12,
'min'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with mode - max
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1, 12,
'max'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with mode - alwaysone
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1, 12,
'alwaysone'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_summary_double to aggregate summary values - sum mode
+SELECT tuple_sketch_summary_double(tuple_sketch_agg_double(key1, val1, 12,
'sum'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_summary_double to aggregate summary values - min mode
+SELECT tuple_sketch_summary_double(tuple_sketch_agg_double(key1, val1, 12,
'min'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_summary_double to aggregate summary values - max mode
+SELECT tuple_sketch_summary_double(tuple_sketch_agg_double(key1, val1, 12,
'max'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_summary_double to aggregate summary values - alwaysone
mode
+SELECT tuple_sketch_summary_double(tuple_sketch_agg_double(key1, val1, 12,
'alwaysone'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_summary_integer with integer summary type
+SELECT tuple_sketch_summary_integer(tuple_sketch_agg_integer(key1, val1, 12,
'sum'), 'sum')
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_sketch_theta_double to get theta value from double sketch
+SELECT tuple_sketch_theta_double(tuple_sketch_agg_double(key1, val1))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_theta_double with LongType key
+SELECT tuple_sketch_theta_double(tuple_sketch_agg_double(key1, val1))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_sketch_theta_double with StringType key
+SELECT tuple_sketch_theta_double(tuple_sketch_agg_double(key1, val1))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_sketch_theta_integer to get theta value from integer sketch
+SELECT tuple_sketch_theta_integer(tuple_sketch_agg_integer(key1, val1, 12))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_union_double function with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_double function with LongType key sketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1, 15),
+ tuple_sketch_agg_double(key2, val2), 15))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_union_double function with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_union_double function with FloatType key sketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1, 6),
+ tuple_sketch_agg_double(key2, val2, 15), 15))
+FROM t_float_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_union_double function with StringType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_union_double function with BinaryType key sketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2, 20), 20))
+FROM t_binary_double_a_b_through_e_f;
+
+-- Test tuple_union_double function with ArrayType(IntegerType) key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_array_int_double_1_3_through_4_6;
+
+-- Test tuple_union_double function with ArrayType(LongType) key sketches and
explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2, 13), 13))
+FROM t_array_long_double_1_3_through_4_6;
+
+-- Test tuple_union_double with lgNomEntries and mode parameters
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'sum'),
+ tuple_sketch_agg_double(key2, val2, 12, 'sum'), 12, 'sum'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double to merge tuple sketch with theta sketch
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double with explicit parameters
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'sum'),
+ theta_sketch_agg(key2, 12), 12, 'sum'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_intersection_double function with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_intersection_double function with LongType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1, 5),
+ tuple_sketch_agg_double(key2, val2, 12)))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_intersection_double function with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_intersection_double function with FloatType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1, 5),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_float_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_intersection_double function with StringType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_intersection_double function with BinaryType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2, 22)))
+FROM t_binary_double_a_b_through_e_f;
+
+-- Test tuple_intersection_double function with ArrayType(IntegerType) key
sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_array_int_double_1_3_through_4_6;
+
+-- Test tuple_intersection_double function with ArrayType(LongType) key
sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2, 10)))
+FROM t_array_long_double_1_3_through_4_6;
+
+-- Test tuple_intersection_double with mode parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'min'),
+ tuple_sketch_agg_double(key2, val2, 12, 'min'), 'min'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_double to intersect tuple sketch with theta
sketch
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_double with explicit parameters
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'sum'),
+ theta_sketch_agg(key2, 12), 'sum'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_difference_double function with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_difference_double function with LongType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2, 5)))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_difference_double function with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_difference_double function with FloatType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1, 12),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_float_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_difference_double function with StringType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_difference_double function with BinaryType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1, 6),
+ tuple_sketch_agg_double(key2, val2, 8)))
+FROM t_binary_double_a_b_through_e_f;
+
+-- Test tuple_difference_double function with ArrayType(IntegerType) key
sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2)))
+FROM t_array_int_double_1_3_through_4_6;
+
+-- Test tuple_difference_double function with ArrayType(LongType) key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_double(
+ tuple_sketch_agg_double(key1, val1),
+ tuple_sketch_agg_double(key2, val2, 4)))
+FROM t_array_long_double_1_3_through_4_6;
+
+-- Test tuple_difference_integer with integer summary type
+SELECT tuple_sketch_estimate_integer(
+ tuple_difference_integer(
+ tuple_sketch_agg_integer(key1, val1, 12, 'sum'),
+ tuple_sketch_agg_integer(key2, val2, 12, 'sum')))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_difference_theta_double to compute A-NOT-B with theta sketch
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_difference_theta_double with explicit parameters
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_theta_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'sum'),
+ theta_sketch_agg(key2, 12)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_agg_double with IntegerType key and explicit lgNomEntries
parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 15, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2, 20) as sketch FROM
t_int_double_1_5_through_7_11);
+
+-- Test tuple_union_agg_double with DoubleType key sketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 12, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_double_double_1_1_1_4_through_1_5_1_8
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_double_double_1_1_1_4_through_1_5_1_8);
+
+-- Test tuple_union_agg_double with StringType key sketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 14, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_string_double_a_d_through_e_h
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_string_double_a_d_through_e_h);
+
+-- Test tuple_union_agg_double with LongType key sketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 10, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_long_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_long_double_1_5_through_7_11);
+
+-- Test tuple_union_agg_double with FloatType keysketches and explicit
lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 6, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_float_double_1_1_1_4_through_1_5_1_8
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_float_double_1_1_1_4_through_1_5_1_8);
+
+-- Test tuple_union_agg_double with BinaryType key sketches
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 12, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_binary_double_a_b_through_e_f
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_binary_double_a_b_through_e_f);
+
+-- Test tuple_union_agg_double with ArrayType(IntegerType) key sketches and
explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 12, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_array_int_double_1_3_through_4_6
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_array_int_double_1_3_through_4_6);
+
+-- Test tuple_union_agg_double with ArrayType(LongType) key sketches and
explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(sketch, 16, 'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_array_long_double_1_3_through_4_6
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_array_long_double_1_3_through_4_6);
+
+-- Test tuple_union_agg_integer with integer summary type
+SELECT tuple_sketch_estimate_integer(tuple_union_agg_integer(sketch, 12,
'sum'))
+FROM (SELECT tuple_sketch_agg_integer(key1, val1, 12, 'sum') as sketch FROM
t_int_int_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_integer(key2, val2, 12, 'sum') as sketch FROM
t_int_int_1_5_through_7_11);
+
+-- Test tuple_intersection_agg_double with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_int_double_1_5_through_7_11);
+
+-- Test tuple_intersection_agg_double with LongType key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_long_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_long_double_1_5_through_7_11);
+
+-- Test tuple_intersection_agg_double with FloatType key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_float_double_1_1_1_4_through_1_5_1_8
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_float_double_1_1_1_4_through_1_5_1_8);
+
+-- Test tuple_intersection_agg_double with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_double_double_1_1_1_4_through_1_5_1_8
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_double_double_1_1_1_4_through_1_5_1_8);
+
+-- Test tuple_intersection_agg_double with StringType key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_string_double_a_d_through_e_h
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_string_double_a_d_through_e_h);
+
+-- Test tuple_intersection_agg_double with BinaryType key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_binary_double_a_b_through_e_f
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_binary_double_a_b_through_e_f);
+
+-- Test tuple_intersection_agg_double with ArrayType(IntegerType) key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_array_int_double_1_3_through_4_6
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_array_int_double_1_3_through_4_6);
+
+-- Test tuple_intersection_agg_double with ArrayType(LongType) key sketches
+SELECT tuple_sketch_estimate_double(tuple_intersection_agg_double(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_array_long_double_1_3_through_4_6
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_array_long_double_1_3_through_4_6);
+
+-- Test tuple_intersection_agg_integer with integer summary type
+SELECT tuple_sketch_estimate_integer(tuple_intersection_agg_integer(sketch,
'sum'))
+FROM (SELECT tuple_sketch_agg_integer(key1, val1, 12, 'sum') as sketch FROM
t_int_int_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_integer(key2, val2, 12, 'sum') as sketch FROM
t_int_int_1_5_through_7_11);
+
+-- Test tuple_sketch_agg_double with IntegerType key and null values (nulls
should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (1, 1.0D), (null, 2.0D), (2, 2.0D), (null, 3.0D), (3, 3.0D)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with StringType key and null values (nulls
should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES ('test', 1.0D), (null, 2.0D), ('null', 3.0D), (null, 4.0D)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with StringType key and Float summary with
null values (nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES ('test', 1.0F), (null, 2.0F), ('null', 3.0F), ('null', 4.0F)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with LongType key yand null values (nulls
should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (100L, 1.0D), (null, 2.0D), (200L, 3.0D), (null, 4.0D), (300L,
5.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with DoubleType key and null values (nulls
should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(CAST(key AS
DOUBLE), val))
+FROM VALUES (1.1, 1.0D), (null, 2.0D), (2.2, 3.0D), (null, 4.0D), (3.3, 5.0D)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with FloatType key and null values (nulls
should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(CAST(key AS
FLOAT), val))
+FROM VALUES (1.5, 1.0D), (null, 2.0D), (2.5, 3.0D), (null, 4.0D), (3.5, 5.0D)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with BinaryType key and null values (nulls
should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (X'AA', 1.0D), (null, 2.0D), (X'BB', 3.0D), (null, 4.0D), (X'CC',
5.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with ArrayType(IntegerType) key and null
values (nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(1, 2), 1.0D), (null, 2.0D), (ARRAY(3, 4), 3.0D), (null,
4.0D), (ARRAY(5, 6), 5.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with ArrayType(LongType) key and null values
(nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(10L, 20L), 1.0), (null, 2.0D), (ARRAY(30L, 40L), 3.0D),
(null, 4.0D), (ARRAY(50L, 60L), 5.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with arrays containing null elements
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(1, null), 1.0), (ARRAY(1), 2.0D), (ARRAY(2, null, 3),
3.0D), (ARRAY(4), 4.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with arrays containing null elements (LongType)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(10L, null), 1.0D), (ARRAY(10L), 2.0D), (ARRAY(20L, null,
30L), 3.0D), (ARRAY(40L), 4.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with null summary values (nulls should be
ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (1, 1.0D), (2, null), (3, 3.0D), (4, null), (5, 5.0D) tab(key,
val);
+
+-- Test tuple_sketch_agg_double with StringType key and null summary values
(nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES ('a', 1.0D), ('b', null), ('c', 3.0D), ('d', null) tab(key, val);
+
+-- Test tuple_sketch_agg_double with LongType key and null summary values
(nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (100L, 1.0D), (200L, null), (300L, 3.0D), (400L, null), (500L,
5.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with DoubleType key and null summary values
(nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(CAST(key AS
DOUBLE), val))
+FROM VALUES (1.1, 1.0D), (2.2, null), (3.3, 3.0D), (4.4, null) tab(key, val);
+
+-- Test tuple_sketch_agg_double with FloatType key and null summary values
(nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(CAST(key AS
FLOAT), val))
+FROM VALUES (1.5, 1.0D), (2.5, null), (3.5, 3.0D), (4.5, null) tab(key, val);
+
+-- Test tuple_sketch_agg_double with BinaryType key and null summary values
(nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (X'AA', 1.0D), (X'BB', null), (X'CC', 3.0D), (X'DD', null)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with ArrayType(IntegerType) key and null
summary values (nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(1, 2), 1.0D), (ARRAY(3, 4), null), (ARRAY(5, 6), 3.0D),
(ARRAY(7, 8), null) tab(key, val);
+
+-- Test tuple_sketch_agg_double with ArrayType(LongType) key and null summary
values (nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(10L, 20L), 1.0D), (ARRAY(30L, 40L), null), (ARRAY(50L,
60L), 3.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_integer with null summary values (nulls should be
ignored)
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key, val, 12))
+FROM VALUES (1, 1), (2, null), (3, 3), (4, null), (5, 5) tab(key, val);
+
+-- Test tuple_sketch_agg_double with both null keys and null summaries (all
nulls should be ignored)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (1, 1.0D), (null, 2.0D), (3, null), (null, null), (5, 5.0D)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with all-null keys (should return null or
empty sketch)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (null, 1.0D), (null, 2.0D), (null, 3.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with all-null summaries (should return null or
empty sketch)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (1, null), (2, null), (3, null) tab(key, val);
+
+-- Test tuple_sketch_agg_double with all-null keys and summaries (should
return null or empty sketch)
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (null, null), (null, null), (null, null) tab(key, val);
+
+-- Test tuple_sketch_agg_integer with all-null keys (should return null or
empty sketch)
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key, val, 12))
+FROM VALUES (null, 1), (null, 2), (null, 3) tab(key, val);
+
+-- Test tuple_sketch_agg_integer with all-null summaries (should return null
or empty sketch)
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key, val, 12))
+FROM VALUES (1, null), (2, null), (3, null) tab(key, val);
+
+-- Test tuple_sketch_agg_integer with all-null keys and summaries (should
return null or empty sketch)
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key, val, 12))
+FROM VALUES (null, null), (null, null), (null, null) tab(key, val);
+
+-- Test tuple_sketch_summary_double with all-null summaries
+SELECT tuple_sketch_summary_double(tuple_sketch_agg_double(key, val, 12,
'sum'))
+FROM VALUES (1, null), (2, null), (3, null) tab(key, val);
+
+-- Test tuple_sketch_summary_integer with all-null summaries
+SELECT tuple_sketch_summary_integer(tuple_sketch_agg_integer(key, val, 12,
'sum'), 'sum')
+FROM VALUES (1, null), (2, null), (3, null) tab(key, val);
+
+-- Test tuple_sketch_agg_double with empty arrays
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (ARRAY(), 1.0D), (ARRAY(1, 2), 2.0D), (ARRAY(), 3.0D), (ARRAY(3,
4), 4.0D) tab(key, val);
+
+-- Test tuple_sketch_agg_double with empty strings
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES ('', 1.0D), ('a', 2.0D), ('', 3.0D), ('b', 4.0D), ('c', 5.0D)
tab(key, val);
+
+-- Test tuple_sketch_agg_double with empty binary data
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key, val))
+FROM VALUES (X'', 1.0D), (X'01', 2.0D), (X'02', 3.0D), (X'03', 4.0D), (CAST('
' AS BINARY), 5.0D), (X'e280', 6.0D), (X'c1', 7.0D), (X'c120', 8.0D) tab(key,
val);
+
+-- Test tuple_sketch_agg_double with collated string data
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1, val1))
utf8_b FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UTF8_LCASE, val1)) utf8_lc FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UNICODE, val1)) unicode FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UNICODE_CI, val1)) unicode_ci FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UTF8_BINARY_RTRIM, val1)) utf8_b_rt FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UTF8_LCASE_RTRIM, val1)) utf8_lc_rt FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UNICODE_RTRIM, val1)) unicode_rt FROM t_string_collation;
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key1 COLLATE
UNICODE_CI_RTRIM, val1)) unicode_ci_rt FROM t_string_collation;
+
+-- Comprehensive test using all TupleSketch functions in a single query
+WITH sketches AS (
+ SELECT 'int_sketch' as sketch_type, tuple_sketch_agg_double(key1, val1, 12,
'sum') as sketch
+ FROM t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT 'long_sketch' as sketch_type, tuple_sketch_agg_double(key1, val1, 15,
'sum') as sketch
+ FROM t_long_double_1_5_through_7_11
+ UNION ALL
+ SELECT 'double_sketch' as sketch_type, tuple_sketch_agg_double(key1, val1,
10, 'sum') as sketch
+ FROM t_double_double_1_1_1_4_through_1_5_1_8
+ UNION ALL
+ SELECT 'string_sketch' as sketch_type, tuple_sketch_agg_double(key1, val1,
14, 'sum') as sketch
+ FROM t_string_double_a_d_through_e_h
+),
+union_result AS (
+ SELECT tuple_union_agg_double(sketch, 16, 'sum') as union_sketch FROM
sketches
+),
+individual_sketches AS (
+ SELECT
+ tuple_sketch_agg_double(key1, val1, 12, 'sum') as sketch1,
+ tuple_sketch_agg_double(key2, val2, 12, 'sum') as sketch2
+ FROM t_int_double_1_5_through_7_11
+)
+SELECT
+ -- Basic estimate from union of all sketches
+ tuple_sketch_estimate_double((SELECT union_sketch FROM union_result)) as
union_estimate,
+ -- Summary aggregation from union
+ tuple_sketch_summary_double((SELECT union_sketch FROM union_result), 'sum')
as union_summary,
+ -- Union of two individual sketches
+ tuple_sketch_estimate_double(tuple_union_double(sketch1, sketch2, 15,
'sum')) as binary_union_estimate,
+ -- Intersection of two individual sketches
+ tuple_sketch_estimate_double(tuple_intersection_double(sketch1, sketch2,
'sum')) as intersection_estimate,
+ -- Difference of two individual sketches
+ tuple_sketch_estimate_double(tuple_difference_double(sketch1, sketch2)) as
difference_estimate
+FROM individual_sketches;
+
+-- Named parameter tests for tuple sketch functions
+
+-- Test tuple_sketch_agg_double with named parameters - only required params
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key => key1,
summary => val1))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with named parameters - setting only
lgNomEntries
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key => key1,
summary => val1, lgNomEntries => 14))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with named parameters - setting only mode
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key => key1,
summary => val1, mode => 'max'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with named parameters - setting both
lgNomEntries and mode
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(key => key1,
summary => val1, lgNomEntries => 10, mode => 'min'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_double with named parameters - different order
+SELECT tuple_sketch_estimate_double(tuple_sketch_agg_double(mode => 'max',
lgNomEntries => 15, summary => val1, key => key1))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_integer with named parameters - only required params
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key => key1,
summary => val1))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_integer with named parameters - setting only mode
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(key => key1,
summary => val1, mode => 'max'))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_sketch_agg_integer with named parameters - different order
+SELECT tuple_sketch_estimate_integer(tuple_sketch_agg_integer(lgNomEntries =>
14, key => key1, mode => 'sum', summary => val1))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_union_agg_double with named parameters - only required param
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(child => sketch))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_int_double_1_5_through_7_11);
+
+-- Test tuple_union_agg_double with named parameters - setting only
lgNomEntries
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(child => sketch,
lgNomEntries => 14))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_int_double_1_5_through_7_11);
+
+-- Test tuple_union_agg_double with named parameters - setting only mode
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(child => sketch,
mode => 'max'))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_int_double_1_5_through_7_11);
+
+-- Test tuple_union_agg_double with named parameters - different order
+SELECT tuple_sketch_estimate_double(tuple_union_agg_double(mode => 'min',
lgNomEntries => 13, child => sketch))
+FROM (SELECT tuple_sketch_agg_double(key1, val1) as sketch FROM
t_int_double_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_double(key2, val2) as sketch FROM
t_int_double_1_5_through_7_11);
+
+-- Test tuple_union_agg_integer with named parameters - setting only mode
+SELECT tuple_sketch_estimate_integer(tuple_union_agg_integer(child => sketch,
mode => 'max'))
+FROM (SELECT tuple_sketch_agg_integer(key1, val1, 12, 'sum') as sketch FROM
t_int_int_1_5_through_7_11
+ UNION ALL
+ SELECT tuple_sketch_agg_integer(key2, val2, 12, 'sum') as sketch FROM
t_int_int_1_5_through_7_11);
+
+-- Test tuple_union_double with named parameters - only required params
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => tuple_sketch_agg_double(key2, val2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_double with named parameters - setting only lgNomEntries
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => tuple_sketch_agg_double(key2, val2),
+ lgNomEntries => 14))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_double with named parameters - setting only mode
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => tuple_sketch_agg_double(key2, val2),
+ mode => 'max'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_double with named parameters - different order
+SELECT tuple_sketch_estimate_double(
+ tuple_union_double(
+ mode => 'min',
+ lgNomEntries => 15,
+ second => tuple_sketch_agg_double(key2, val2),
+ first => tuple_sketch_agg_double(key1, val1)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double with named parameters - setting only mode
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => theta_sketch_agg(key2),
+ mode => 'max'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double with named parameters - different order
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ mode => 'min',
+ second => theta_sketch_agg(key2),
+ lgNomEntries => 14,
+ first => tuple_sketch_agg_double(key1, val1)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_integer with named parameters - setting only lgNomEntries
+SELECT tuple_sketch_estimate_integer(
+ tuple_union_integer(
+ first => tuple_sketch_agg_integer(key1, val1),
+ second => tuple_sketch_agg_integer(key2, val2),
+ lgNomEntries => 14))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_union_theta_integer with named parameters - different order
+SELECT tuple_sketch_estimate_integer(
+ tuple_union_theta_integer(
+ lgNomEntries => 13,
+ mode => 'max',
+ second => theta_sketch_agg(key2),
+ first => tuple_sketch_agg_integer(key1, val1)))
+FROM t_int_int_1_5_through_7_11;
+
+-- Negative test cases
Review Comment:
More testing ideas:
<img width="880" height="1662" alt="image"
src="https://github.com/user-attachments/assets/9a823be4-e45d-4e32-a521-547149592191"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]