dtenedor commented on code in PR #52883:
URL: https://github.com/apache/spark/pull/52883#discussion_r2684152953


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/tuplesketchesExpressions.scala:
##########
@@ -0,0 +1,1070 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.tuple.{AnotB, Intersection, Summary, 
SummarySetOperations, TupleSketchIterator, Union}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary, 
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary, 
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, 
InputParameter}
+import org.apache.spark.sql.catalyst.util.{ThetaSketchUtils, TupleSummaryMode}
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
DoubleType, IntegerType, LongType}
+import org.apache.spark.unsafe.types.UTF8String
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(child) - Returns the estimated number of unique values
+    given the binary representation of a Datasketches TupleSketch. The sketch's
+    summary type must be a double. """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(tuple_sketch_agg_double(key, summary)) FROM VALUES (1, 
1.0D), (1, 2.0D), (2, 3.0D) tab(key, summary);
+       2.0
+  """,
+  group = "misc_funcs",
+  since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleSketchEstimateDouble(child: Expression)
+    extends UnaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes {
+
+  override def nullIntolerant: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override def dataType: DataType = DoubleType
+
+  override def prettyName: String = "tuple_sketch_estimate_double"
+
+  override protected def withNewChildInternal(newChild: Expression): 
TupleSketchEstimateDouble =
+    copy(child = newChild)
+
+  override def nullSafeEval(input: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    val sketch = ThetaSketchUtils.heapifyDoubleTupleSketch(buffer, prettyName)
+    sketch.getEstimate()
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(child) - Returns the estimated number of unique values
+    given the binary representation of a Datasketches TupleSketch. The sketch's
+    summary type must be an integer. """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(tuple_sketch_agg_integer(key, summary)) FROM VALUES (1, 
1), (1, 2), (2, 3) tab(key, summary);
+       2.0
+  """,
+  group = "misc_funcs",
+  since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleSketchEstimateInteger(child: Expression)
+    extends UnaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes {
+
+  override def nullIntolerant: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override def dataType: DataType = DoubleType
+
+  override def prettyName: String = "tuple_sketch_estimate_integer"
+
+  override protected def withNewChildInternal(newChild: Expression): 
TupleSketchEstimateInteger =
+    copy(child = newChild)
+
+  override def nullSafeEval(input: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    val sketch = ThetaSketchUtils.heapifyIntegerTupleSketch(buffer, prettyName)
+    sketch.getEstimate()
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(child, mode) - Aggregates the summary values from a double summary 
type
+    Datasketches TupleSketch. The mode can be 'sum', 'min', 'max', or 
'alwaysone'
+    (defaults to 'sum'). """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(tuple_sketch_agg_double(key, summary)) FROM VALUES (1, 
1.0D), (1, 2.0D), (2, 3.0D) tab(key, summary);
+       6.0
+  """,
+  group = "misc_funcs",
+  since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleSketchSummaryDouble(left: Expression, right: Expression)
+    extends BinaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes {
+
+  def this(child: Expression) = {
+    this(child, Literal(TupleSummaryMode.Sum.toString))
+  }
+
+  override def nullIntolerant: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] =
+    Seq(BinaryType, StringTypeWithCollation(supportsTrimCollation = true))
+
+  override def dataType: DataType = DoubleType
+
+  override def prettyName: String = "tuple_sketch_summary_double"
+
+  override protected def withNewChildrenInternal(
+      newFirst: Expression,
+      newSecond: Expression): TupleSketchSummaryDouble =
+    copy(left = newFirst, right = newSecond)
+
+  override def nullSafeEval(input: Any, modeInput: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    val modeStr = modeInput.asInstanceOf[UTF8String].toString
+
+    // Parse and validate mode in one step
+    val mode = TupleSummaryMode.fromString(modeStr, prettyName)
+
+    val sketch = ThetaSketchUtils.heapifyDoubleTupleSketch(buffer, prettyName)
+
+    ThetaSketchUtils.aggregateNumericSummaries[DoubleSummary, Double](
+      sketch.iterator(),
+      mode,
+      (it: TupleSketchIterator[DoubleSummary]) => it.getSummary.getValue)
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(child, mode) - Aggregates the summary values from a integer summary 
type
+    Datasketches TupleSketch. The mode can be 'sum', 'min', 'max', or 
'alwaysone'
+    (defaults to 'sum'). """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(tuple_sketch_agg_integer(key, summary)) FROM VALUES (1, 
1), (1, 2), (2, 3) tab(key, summary);
+       6
+  """,
+  group = "misc_funcs",
+  since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleSketchSummaryInteger(left: Expression, right: Expression)
+    extends BinaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes {
+
+  def this(child: Expression) = {
+    this(child, Literal(TupleSummaryMode.Sum.toString))
+  }
+
+  override def nullIntolerant: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] =
+    Seq(BinaryType, StringTypeWithCollation(supportsTrimCollation = true))
+
+  override def dataType: DataType = LongType
+
+  override def prettyName: String = "tuple_sketch_summary_integer"
+
+  override protected def withNewChildrenInternal(
+      newFirst: Expression,
+      newSecond: Expression): TupleSketchSummaryInteger =
+    copy(left = newFirst, right = newSecond)
+
+  override def nullSafeEval(input: Any, modeInput: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    val modeStr = modeInput.asInstanceOf[UTF8String].toString
+
+    // Parse and validate mode in one step
+    val mode = TupleSummaryMode.fromString(modeStr, prettyName)
+
+    val sketch = ThetaSketchUtils.heapifyIntegerTupleSketch(buffer, prettyName)
+
+    ThetaSketchUtils.aggregateNumericSummaries[IntegerSummary, Long](
+      sketch.iterator(),
+      mode,
+      (it: TupleSketchIterator[IntegerSummary]) => 
it.getSummary.getValue.toLong)
+  }
+}
+
+case class TupleUnionDouble(

Review Comment:
   This class needs Scaladoc comments? Same below?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtils.scala:
##########
@@ -17,12 +17,81 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.util.Locale
+
 import org.apache.datasketches.common.SketchesArgumentException
 import org.apache.datasketches.memory.{Memory, MemoryBoundsException}
 import org.apache.datasketches.theta.CompactSketch
+import org.apache.datasketches.tuple.{Sketch, Sketches, Summary, 
TupleSketchIterator}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary, 
DoubleSummaryDeserializer}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary, 
IntegerSummaryDeserializer}
 
 import org.apache.spark.sql.errors.QueryExecutionErrors
 
+/**
+ * Sealed trait representing valid summary modes for tuple sketches. This 
provides type-safe
+ * mode handling with compile-time exhaustiveness checking and prevents 
invalid modes from
+ * being created.
+ */
+sealed trait TupleSummaryMode {

Review Comment:
   Would this new code go into a new file `tupleSketchUtils.scala` instead of 
the existing `ThetaSketchUtils.scala` code which is specialized for Theta 
sketches?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/tuplesketchesAggregates.scala:
##########
@@ -0,0 +1,1487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.tuple.{Intersection, Sketch, Summary, 
SummaryFactory, SummarySetOperations, Union, UpdatableSketch, 
UpdatableSketchBuilder, UpdatableSummary}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary, 
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary, 
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, 
TypeCheckResult}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, ImplicitCastInputTypes, Literal}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, 
InputParameter}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, QuaternaryLike, 
TernaryLike}
+import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, 
ThetaSketchUtils}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, ArrayType, BinaryType, 
DataType, DoubleType, FloatType, IntegerType, LongType, StringType, 
TypeCollection}
+import org.apache.spark.unsafe.types.UTF8String
+
+sealed trait TupleSketchState[S <: Summary] {
+  def serialize(): Array[Byte]
+  def eval(): Array[Byte]
+}
+case class UpdatableTupleSketchBuffer[U, S <: UpdatableSummary[U]](sketch: 
UpdatableSketch[U, S])
+    extends TupleSketchState[S] {
+  override def serialize(): Array[Byte] = sketch.compact.toByteArray
+  override def eval(): Array[Byte] = sketch.compact.toByteArray
+
+  /** Returns compact form of the sketch, needed for merge operations that 
require Sketch type. */
+  def compactSketch: Sketch[S] = sketch.compact
+}
+case class UnionTupleAggregationBuffer[S <: Summary](union: Union[S])
+    extends TupleSketchState[S] {
+  override def serialize(): Array[Byte] = union.getResult.toByteArray
+  override def eval(): Array[Byte] = union.getResult.toByteArray
+}
+case class IntersectionTupleAggregationBuffer[S <: Summary](intersection: 
Intersection[S])
+    extends TupleSketchState[S] {
+  override def serialize(): Array[Byte] = intersection.getResult.toByteArray
+  override def eval(): Array[Byte] = intersection.getResult.toByteArray
+}
+case class FinalizedTupleSketch[S <: Summary](sketch: Sketch[S]) extends 
TupleSketchState[S] {
+  override def serialize(): Array[Byte] = sketch.toByteArray
+  override def eval(): Array[Byte] = sketch.toByteArray
+}
+
+/**
+ * The TupleSketchAggDouble function utilizes a Datasketches TupleSketch 
instance to count a
+ * probabilistic approximation of the number of unique values in a given 
column with associated
+ * double type summary values that can be aggregated using different modes 
(sum, min, max,
+ * alwaysone), and outputs the binary representation of the TupleSketch.
+ *
+ * Keys are hashed internally based on their type and value - the same logical 
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys. 
However, summary
+ * value types must be consistent across all calls; mixing types can produce 
incorrect results or
+ * precision loss. The value type suffix in the function name (e.g., _double) 
ensures type safety.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for 
more information.
+ *
+ * @param key
+ *   key expression against which unique counting will occur
+ * @param summary
+ *   summary expression (double type) against which different mode 
aggregations will occur
+ * @param lgNomEntries
+ *   the log-base-2 of nomEntries decides the number of buckets for the sketch
+ * @param mode
+ *   the aggregation mode for numeric summaries (sum, min, max, alwaysone)
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+case class TupleSketchAggDouble(

Review Comment:
   OK, sounds good.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##########
@@ -816,6 +822,37 @@ object FunctionRegistry {
     expression[ThetaDifference]("theta_difference"),
     expression[ThetaIntersection]("theta_intersection"),
     expression[ApproxTopKEstimate]("approx_top_k_estimate"),
+    expression[KllSketchToStringBigint]("kll_sketch_to_string_bigint"),

Review Comment:
   Do we need to add KLL sketch related expressions here in this PR?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/tuplesketchesAggregates.scala:
##########
@@ -0,0 +1,1487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.tuple.{Intersection, Sketch, Summary, 
SummaryFactory, SummarySetOperations, Union, UpdatableSketch, 
UpdatableSketchBuilder, UpdatableSummary}
+import org.apache.datasketches.tuple.adouble.{DoubleSummary, 
DoubleSummaryFactory, DoubleSummarySetOperations}
+import org.apache.datasketches.tuple.aninteger.{IntegerSummary, 
IntegerSummaryFactory, IntegerSummarySetOperations}
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, 
TypeCheckResult}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, ImplicitCastInputTypes, Literal}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, 
InputParameter}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, QuaternaryLike, 
TernaryLike}
+import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, 
ThetaSketchUtils, TupleSummaryMode}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
+import org.apache.spark.sql.types.{AbstractDataType, ArrayType, BinaryType, 
DataType, DoubleType, FloatType, IntegerType, LongType, StringType, 
TypeCollection}
+import org.apache.spark.unsafe.types.UTF8String
+
+sealed trait TupleSketchState[S <: Summary] {
+  def serialize(): Array[Byte]
+  def eval(): Array[Byte]
+}
+case class UpdatableTupleSketchBuffer[U, S <: UpdatableSummary[U]](sketch: 
UpdatableSketch[U, S])
+    extends TupleSketchState[S] {
+  override def serialize(): Array[Byte] = sketch.compact.toByteArray
+  override def eval(): Array[Byte] = sketch.compact.toByteArray
+
+  /** Returns compact form of the sketch, needed for merge operations that 
require Sketch type. */
+  def compactSketch: Sketch[S] = sketch.compact
+}
+case class UnionTupleAggregationBuffer[S <: Summary](union: Union[S])
+    extends TupleSketchState[S] {
+  override def serialize(): Array[Byte] = union.getResult.toByteArray
+  override def eval(): Array[Byte] = union.getResult.toByteArray
+}
+case class IntersectionTupleAggregationBuffer[S <: Summary](intersection: 
Intersection[S])
+    extends TupleSketchState[S] {
+  override def serialize(): Array[Byte] = intersection.getResult.toByteArray
+  override def eval(): Array[Byte] = intersection.getResult.toByteArray
+}
+case class FinalizedTupleSketch[S <: Summary](sketch: Sketch[S]) extends 
TupleSketchState[S] {
+  override def serialize(): Array[Byte] = sketch.toByteArray
+  override def eval(): Array[Byte] = sketch.toByteArray
+}
+
+/**
+ * The TupleSketchAggDouble function utilizes a Datasketches TupleSketch 
instance to count a
+ * probabilistic approximation of the number of unique values in a given 
column with associated
+ * double type summary values that can be aggregated using different modes 
(sum, min, max,
+ * alwaysone), and outputs the binary representation of the TupleSketch.
+ *
+ * Keys are hashed internally based on their type and value - the same logical 
value in different
+ * types (e.g., String("123") and Int(123)) will be treated as distinct keys. 
However, summary
+ * value types must be consistent across all calls; mixing types can produce 
incorrect results or
+ * precision loss. The value type suffix in the function name (e.g., _double) 
ensures type safety.
+ *
+ * See [[https://datasketches.apache.org/docs/Tuple/TupleSketches.html]] for 
more information.
+ *
+ * @param key
+ *   key expression against which unique counting will occur
+ * @param summary
+ *   summary expression (double type) against which different mode 
aggregations will occur
+ * @param lgNomEntries
+ *   the log-base-2 of nomEntries decides the number of buckets for the sketch
+ * @param mode
+ *   the aggregation mode for numeric summaries (sum, min, max, alwaysone)
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+case class TupleSketchAggDouble(

Review Comment:
   Some of the new files in this PR (including this one) are pretty large. Do 
you think we can split some up into smaller files with a fixed focus for each 
file in order to make it easier to read the code?
   
   We could split up `tuplesketchesAggregates.scala` into:
   * `tupleSketchAgg.scala` for the TupleSketchAgg* classes
   * `tupleUnionAgg.scala` for the TupleUnionAgg* classes
   * `tupleIntersectionAgg.scala` for the TupleIntersectionAgg* classes
   * `tupleAggUtils.scala` for everything else
   
   We could split up `tuplesketchesExpressions.scala` into:
   * `tupleSketchEstimate.scala` for the TupleSketchEstimate* classes
   * `tupleSketchSummary.scala` for the TupleSketchSummary* classes
   * `tupleUnion.scala` for the TupleUnion* classes
   * `tupleIntersection.scala` for the TupleIntersection* classes
   * `tupleDifference.scala` for the TupleDifference* classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to