dtenedor commented on code in PR #53548:
URL: https://github.com/apache/spark/pull/53548#discussion_r2673462039


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala:
##########
@@ -449,6 +449,414 @@ case class KllSketchAggDouble(
   }
 }
 
+/**
+ * The KllMergeAggBigint function merges multiple Apache DataSketches 
KllLongsSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllLongsSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllLongsSketch representations to merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   Default is 200 (normalized rank error ~1.65%). Larger k values provide 
more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllLongsSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_bigint).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (default 200, range 8-65535).
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_bigint(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_bigint(col) as sketch FROM VALUES (1), (2), (3) tab(col) UNION 
ALL SELECT kll_sketch_agg_bigint(col) as sketch FROM VALUES (4), (5), (6) 
tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggBigint(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends TypedImperativeAggregate[KllLongsSketch]
+    with KllSketchAggBase
+    with ExpectsInputTypes {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def children: Seq[Expression] = child +: kExpr.toSeq
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggBigint =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggBigint =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggBigint = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def dataType: DataType = BinaryType
+  override def inputTypes: Seq[AbstractDataType] = {
+    val baseTypes = Seq(BinaryType)
+    if (kExpr.isDefined) baseTypes :+ IntegerType else baseTypes
+  }
+  override def nullable: Boolean = false
+  override def prettyName: String = "kll_merge_agg_bigint"
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck
+    }
+    checkKInputDataTypes()
+  }
+
+  override def createAggregationBuffer(): KllLongsSketch =
+    KllLongsSketch.newHeapInstance(kValue)
+
+  /**
+   * Evaluate the input row and deserialize the binary KllLongsSketch, then 
merge it into
+   * the current aggregation buffer.
+   * Note, null values are ignored.
+   */
+  override def update(sketch: KllLongsSketch, input: InternalRow): 
KllLongsSketch = {
+    // Return early for null values.
+    val v = child.eval(input)
+    if (v == null) {
+      return sketch
+    }
+
+    try {
+      val sketchBytes = v.asInstanceOf[Array[Byte]]
+      val inputSketch = KllLongsSketch.wrap(Memory.wrap(sketchBytes))
+      sketch.merge(inputSketch)
+    } catch {
+      case e: Exception =>
+        throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName, 
e.getMessage)
+    }
+
+    sketch
+  }
+
+  /** Merges an input sketch into the current aggregation buffer. */
+  override def merge(updateBuffer: KllLongsSketch, input: KllLongsSketch): 
KllLongsSketch = {
+    try {
+      updateBuffer.merge(input)
+      updateBuffer
+    } catch {
+      case e: Exception =>
+        throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName, 
e.getMessage)
+    }
+  }
+
+  /** Returns a sketch derived from the input column or expression. */
+  override def eval(sketch: KllLongsSketch): Any = sketch.toByteArray
+
+  /** Converts the underlying sketch state into a byte array. */
+  override def serialize(sketch: KllLongsSketch): Array[Byte] = 
sketch.toByteArray
+
+  /** Wraps the byte array into a sketch instance. */
+  override def deserialize(buffer: Array[Byte]): KllLongsSketch = if 
(buffer.nonEmpty) {
+    try {
+      KllLongsSketch.heapify(Memory.wrap(buffer))
+    } catch {
+      case e: Exception =>
+        throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName, 
e.getMessage)
+    }
+  } else {
+    createAggregationBuffer()
+  }
+}
+
+/**
+ * The KllMergeAggFloat function merges multiple Apache DataSketches 
KllFloatsSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllFloatsSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllFloatsSketch representations to 
merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   Default is 200 (normalized rank error ~1.65%). Larger k values provide 
more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllFloatsSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_float).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (default 200, range 8-65535).
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_float(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(1.0 AS FLOAT)), (CAST(2.0 
AS FLOAT)), (CAST(3.0 AS FLOAT)) tab(col) UNION ALL SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(4.0 AS FLOAT)), (CAST(5.0 
AS FLOAT)), (CAST(6.0 AS FLOAT)) tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggFloat(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends TypedImperativeAggregate[KllFloatsSketch]
+    with KllSketchAggBase
+    with ExpectsInputTypes {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def children: Seq[Expression] = child +: kExpr.toSeq
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggFloat =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggFloat =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggFloat = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def dataType: DataType = BinaryType
+  override def inputTypes: Seq[AbstractDataType] = {
+    val baseTypes = Seq(BinaryType)
+    if (kExpr.isDefined) baseTypes :+ IntegerType else baseTypes

Review Comment:
   Replied above, we can continue to support the k parameter as optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to