cboumalh commented on code in PR #52883:
URL: https://github.com/apache/spark/pull/52883#discussion_r2620485978
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtils.scala:
##########
@@ -54,21 +68,213 @@ object ThetaSketchUtils {
}
/**
- * Wraps a byte array into a DataSketches CompactSketch object.
- * This method safely deserializes a compact Theta sketch from its binary
representation,
- * handling potential deserialization errors by throwing appropriate Spark
SQL exceptions.
+ * Validates the mode parameter. Throws a Spark SQL exception if the mode is
invalid.
*
- * @param bytes The binary representation of a compact theta sketch
- * @param prettyName The display name of the function/expression for error
messages
- * @return A CompactSketch object wrapping the provided bytes
+ * @param mode
+ * The mode string to validate
+ * @param prettyName
+ * The display name of the function/expression for error messages
*/
- def wrapCompactSketch(bytes: Array[Byte], prettyName: String): CompactSketch
= {
- val memory = try {
- Memory.wrap(bytes)
- } catch {
- case _: NullPointerException | _: MemoryBoundsException =>
- throw QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
+ def checkMode(mode: String, prettyName: String): Unit = {
+ if (!VALID_MODES.contains(mode)) {
+ throw QueryExecutionErrors.tupleInvalidMode(prettyName, mode,
VALID_MODES)
+ }
+ }
+
+ /**
+ * Converts the mode string input to DoubleSummary.Mode enum. Used for
double summary type
+ * operations.
+ *
+ * @param modeInput
+ * The mode string to convert
+ * @return
+ * The corresponding DoubleSummary.Mode enum value
+ */
+ def getDoubleSummaryMode(modeInput: String): DoubleSummary.Mode = {
+ modeInput match {
+ case MODE_SUM => DoubleSummary.Mode.Sum
+ case MODE_MIN => DoubleSummary.Mode.Min
+ case MODE_MAX => DoubleSummary.Mode.Max
+ case MODE_ALWAYSONE => DoubleSummary.Mode.AlwaysOne
+ }
+ }
+
+ /**
+ * Converts the mode string input to IntegerSummary.Mode enum. Used for
integer summary type
+ * operations.
+ *
+ * @param modeInput
+ * The mode string to convert
+ * @return
+ * The corresponding IntegerSummary.Mode enum value
+ */
+ def getIntegerSummaryMode(modeInput: String): IntegerSummary.Mode = {
+ modeInput match {
+ case MODE_SUM => IntegerSummary.Mode.Sum
+ case MODE_MIN => IntegerSummary.Mode.Min
+ case MODE_MAX => IntegerSummary.Mode.Max
+ case MODE_ALWAYSONE => IntegerSummary.Mode.AlwaysOne
}
+ }
+
+ def aggregateNumericSummaries[S <: Summary, V](
+ iterator: TupleSketchIterator[S],
+ mode: String,
+ getValue: TupleSketchIterator[S] => V)(implicit num: Numeric[V]): V = {
+
+ mode match {
+ case MODE_SUM =>
+ var sum = num.zero
+ while (iterator.next()) {
+ sum = num.plus(sum, getValue(iterator))
+ }
+ sum
+
+ case MODE_MIN =>
+ var min: Option[V] = None
+ while (iterator.next()) {
+ val value = getValue(iterator)
+ min = min match {
+ case Some(m) => Some(num.min(m, value))
+ case None => Some(value)
+ }
+ }
+ min.getOrElse(num.zero)
+
+ case MODE_MAX =>
+ var max: Option[V] = None
+ while (iterator.next()) {
+ val value = getValue(iterator)
+ max = max match {
+ case Some(m) => Some(num.max(m, value))
+ case None => Some(value)
+ }
+ }
+ max.getOrElse(num.zero)
+
+ case MODE_ALWAYSONE =>
+ var count = num.zero
+ while (iterator.next()) {
+ count = num.plus(count, num.one)
+ }
+ count
+ }
+ }
+
+ /**
+ * Deserializes a Double summary type binary tuple sketch representation
into a CompactSketch.
+ *
+ * @param bytes
+ * The binary sketch data to deserialize
+ * @param prettyName
+ * The display name of the function/expression for error messages
+ * @return
+ * A deserialized sketch
+ */
+ def heapifyDoubleTupleSketch(bytes: Array[Byte], prettyName: String):
Sketch[DoubleSummary] = {
+ val memory =
+ try {
+ Memory.wrap(bytes)
+ } catch {
+ case _: NullPointerException | _: MemoryBoundsException =>
+ throw QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
+ }
+
+ val sketch =
+ try {
+ Sketches.heapifySketch(memory, new DoubleSummaryDeserializer())
+ } catch {
+ case e: Exception =>
+ throw QueryExecutionErrors.tupleInvalidInputSketchBuffer(prettyName,
e.getMessage)
+ }
+
+ sketch
+ }
+
+ /**
+ * Deserializes a Integer summary type binary tuple sketch representation
into a CompactSketch.
+ *
+ * @param bytes
+ * The binary sketch data to deserialize
+ * @param prettyName
+ * The display name of the function/expression for error messages
+ * @return
+ * A deserialized sketch
+ */
+ def heapifyIntegerTupleSketch(
+ bytes: Array[Byte],
+ prettyName: String): Sketch[IntegerSummary] = {
+ val memory =
+ try {
+ Memory.wrap(bytes)
+ } catch {
+ case _: NullPointerException | _: MemoryBoundsException =>
+ throw QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
+ }
+
+ val sketch =
+ try {
+ Sketches.heapifySketch(memory, new IntegerSummaryDeserializer())
+ } catch {
+ case e: Exception =>
+ throw QueryExecutionErrors.tupleInvalidInputSketchBuffer(prettyName,
e.getMessage)
+ }
+
+ sketch
+ }
+
+ /**
+ * Deserializes a String summary type binary tuple sketch representation
into a CompactSketch.
+ *
+ * @param bytes
+ * The binary sketch data to deserialize
+ * @param prettyName
+ * The display name of the function/expression for error messages
+ * @return
+ * A deserialized sketch
+ */
+ def heapifyStringTupleSketch(
+ bytes: Array[Byte],
+ prettyName: String): Sketch[ArrayOfStringsSummary] = {
+ val memory =
+ try {
+ Memory.wrap(bytes)
+ } catch {
+ case _: NullPointerException | _: MemoryBoundsException =>
+ throw QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
+ }
+
+ val sketch =
+ try {
+ Sketches.heapifySketch(memory, new ArrayOfStringsSummaryDeserializer())
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]