cboumalh commented on code in PR #52883:
URL: https://github.com/apache/spark/pull/52883#discussion_r2627648537


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtilsSuite.scala:
##########
@@ -114,7 +110,242 @@ class ThetaSketchUtilsSuite extends SparkFunSuite with 
SQLHelper {
         ThetaSketchUtils.wrapCompactSketch(invalidBytes, "test_function")
       },
       condition = "THETA_INVALID_INPUT_SKETCH_BUFFER",
-      parameters = Map("function" -> "`test_function`")
-    )
+      parameters = Map("function" -> "`test_function`"))
+  }
+
+  test("checkMode: accepts valid modes") {
+    val validModes = Seq(
+      ThetaSketchUtils.MODE_SUM,
+      ThetaSketchUtils.MODE_MIN,
+      ThetaSketchUtils.MODE_MAX,
+      ThetaSketchUtils.MODE_ALWAYSONE)
+    validModes.foreach { mode =>
+      // Should not throw any exception
+      ThetaSketchUtils.checkMode(mode, "test_function")
+    }
+  }
+
+  test("checkMode: throws exception for invalid modes") {
+    val invalidModes = Seq("invalid", "average", "count", "multiply", "")
+    invalidModes.foreach { mode =>
+      checkError(
+        exception = intercept[SparkRuntimeException] {
+          ThetaSketchUtils.checkMode(mode, "test_function")
+        },
+        condition = "TUPLE_INVALID_SKETCH_MODE",
+        parameters = Map(
+          "function" -> "`test_function`",
+          "mode" -> mode,
+          "validModes" -> ThetaSketchUtils.VALID_MODES.mkString(", ")))
+    }
+  }
+
+  test("heapifyDoubleTupleSketch: successfully deserializes valid tuple sketch 
bytes") {
+    // Create a valid tuple sketch and get its bytes
+    val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+    val updateSketch = new UpdatableSketchBuilder[java.lang.Double, 
DoubleSummary](summaryFactory)
+      .build()
+
+    updateSketch.update("test1", 1.0)
+    updateSketch.update("test2", 2.0)
+    updateSketch.update("test3", 3.0)
+
+    val compactSketch = updateSketch.compact()
+    val validBytes = compactSketch.toByteArray
+
+    // Test that heapifyDoubleTupleSketch can successfully deserialize the 
valid bytes
+    val heapifiedSketch = 
ThetaSketchUtils.heapifyDoubleTupleSketch(validBytes, "test_function")
+
+    assert(heapifiedSketch != null)
+    assert(heapifiedSketch.getEstimate == compactSketch.getEstimate)
+    assert(heapifiedSketch.getRetainedEntries == 
compactSketch.getRetainedEntries)
+  }
+
+  test("heapifyDoubleTupleSketch: throws exception for invalid bytes") {
+    val invalidBytes = Array[Byte](1, 2, 3, 4, 5)
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        ThetaSketchUtils.heapifyDoubleTupleSketch(invalidBytes, 
"test_function")
+      },
+      condition = "TUPLE_INVALID_INPUT_SKETCH_BUFFER",
+      parameters = Map(
+        "function" -> "`test_function`",
+        "reason" -> "Possible corruption: Invalid Family: COMPACT"))
+  }
+
+  test("getDoubleSummaryMode: returns correct mode for valid strings") {
+    assert(ThetaSketchUtils.getDoubleSummaryMode("sum") == 
DoubleSummary.Mode.Sum)
+    assert(ThetaSketchUtils.getDoubleSummaryMode("min") == 
DoubleSummary.Mode.Min)
+    assert(ThetaSketchUtils.getDoubleSummaryMode("max") == 
DoubleSummary.Mode.Max)
+    assert(ThetaSketchUtils.getDoubleSummaryMode("alwaysone") == 
DoubleSummary.Mode.AlwaysOne)
+  }
+
+  test("getIntegerSummaryMode: returns correct mode for valid strings") {
+    assert(ThetaSketchUtils.getIntegerSummaryMode("sum") == 
IntegerSummary.Mode.Sum)
+    assert(ThetaSketchUtils.getIntegerSummaryMode("min") == 
IntegerSummary.Mode.Min)
+    assert(ThetaSketchUtils.getIntegerSummaryMode("max") == 
IntegerSummary.Mode.Max)
+    assert(ThetaSketchUtils.getIntegerSummaryMode("alwaysone") == 
IntegerSummary.Mode.AlwaysOne)
+  }
+
+  test("heapifyIntegerTupleSketch: successfully deserializes valid tuple 
sketch bytes") {
+    import org.apache.datasketches.tuple.aninteger.{IntegerSummary, 
IntegerSummaryFactory}
+    // Create a valid integer tuple sketch and get its bytes
+    val summaryFactory = new IntegerSummaryFactory(IntegerSummary.Mode.Sum)
+    val updateSketch =
+      new UpdatableSketchBuilder[java.lang.Integer, 
IntegerSummary](summaryFactory)
+        .build()
+
+    updateSketch.update("test1", 1)
+    updateSketch.update("test2", 2)
+    updateSketch.update("test3", 3)
+
+    val compactSketch = updateSketch.compact()
+    val validBytes = compactSketch.toByteArray
+
+    // Test that heapifyIntegerTupleSketch can successfully deserialize the 
valid bytes
+    val heapifiedSketch = 
ThetaSketchUtils.heapifyIntegerTupleSketch(validBytes, "test_function")
+
+    assert(heapifiedSketch != null)
+    assert(heapifiedSketch.getEstimate == compactSketch.getEstimate)
+    assert(heapifiedSketch.getRetainedEntries == 
compactSketch.getRetainedEntries)
+  }
+
+  test("heapifyIntegerTupleSketch: throws exception for invalid bytes") {
+    val invalidBytes = Array[Byte](1, 2, 3, 4, 5)
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        ThetaSketchUtils.heapifyIntegerTupleSketch(invalidBytes, 
"test_function")
+      },
+      condition = "TUPLE_INVALID_INPUT_SKETCH_BUFFER",
+      parameters = Map(
+        "function" -> "`test_function`",
+        "reason" -> "Possible corruption: Invalid Family: COMPACT"))
+  }
+
+  test("heapifyStringTupleSketch: successfully deserializes valid tuple sketch 
bytes") {
+    import org.apache.datasketches.tuple.strings.{ArrayOfStringsSummary, 
ArrayOfStringsSummaryFactory}
+    // Create a valid string tuple sketch and get its bytes
+    val summaryFactory = new ArrayOfStringsSummaryFactory()
+    val updateSketch =
+      new UpdatableSketchBuilder[Array[String], 
ArrayOfStringsSummary](summaryFactory)
+        .build()
+
+    updateSketch.update("test1", Array("a"))
+    updateSketch.update("test2", Array("b"))
+    updateSketch.update("test3", Array("c"))
+
+    val compactSketch = updateSketch.compact()
+    val validBytes = compactSketch.toByteArray
+
+    // Test that heapifyStringTupleSketch can successfully deserialize the 
valid bytes
+    val heapifiedSketch = 
ThetaSketchUtils.heapifyStringTupleSketch(validBytes, "test_function")
+
+    assert(heapifiedSketch != null)
+    assert(heapifiedSketch.getEstimate == compactSketch.getEstimate)
+    assert(heapifiedSketch.getRetainedEntries == 
compactSketch.getRetainedEntries)
+  }
+
+  test("heapifyStringTupleSketch: throws exception for invalid bytes") {
+    val invalidBytes = Array[Byte](1, 2, 3, 4, 5)
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        ThetaSketchUtils.heapifyStringTupleSketch(invalidBytes, 
"test_function")
+      },
+      condition = "TUPLE_INVALID_INPUT_SKETCH_BUFFER",
+      parameters = Map(
+        "function" -> "`test_function`",
+        "reason" -> "Possible corruption: Invalid Family: COMPACT"))
+  }
+
+  test("aggregateNumericSummaries: sum mode aggregates correctly for Double") {
+    val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+    val updateSketch = new UpdatableSketchBuilder[java.lang.Double, 
DoubleSummary](summaryFactory)
+      .build()
+
+    updateSketch.update("test1", 1.0)
+    updateSketch.update("test2", 2.0)
+    updateSketch.update("test3", 3.0)
+
+    val compactSketch = updateSketch.compact()
+    val result = ThetaSketchUtils.aggregateNumericSummaries[DoubleSummary, 
Double](
+      compactSketch.iterator(),
+      "sum",
+      it => it.getSummary.getValue)
+
+    assert(result == 6.0)
+  }
+
+  test("aggregateNumericSummaries: min mode finds minimum for Double") {
+    val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+    val updateSketch = new UpdatableSketchBuilder[java.lang.Double, 
DoubleSummary](summaryFactory)
+      .build()
+
+    updateSketch.update("test1", 5.0)
+    updateSketch.update("test2", 2.0)
+    updateSketch.update("test3", 8.0)
+
+    val compactSketch = updateSketch.compact()
+    val result = ThetaSketchUtils.aggregateNumericSummaries[DoubleSummary, 
Double](
+      compactSketch.iterator(),
+      "min",
+      it => it.getSummary.getValue)
+
+    assert(result == 2.0)
+  }
+
+  test("aggregateNumericSummaries: max mode finds maximum for Double") {
+    val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+    val updateSketch = new UpdatableSketchBuilder[java.lang.Double, 
DoubleSummary](summaryFactory)
+      .build()
+
+    updateSketch.update("test1", 5.0)
+    updateSketch.update("test2", 2.0)
+    updateSketch.update("test3", 8.0)
+
+    val compactSketch = updateSketch.compact()
+    val result = ThetaSketchUtils.aggregateNumericSummaries[DoubleSummary, 
Double](
+      compactSketch.iterator(),
+      "max",
+      it => it.getSummary.getValue)
+
+    assert(result == 8.0)
+  }
+
+  test("aggregateNumericSummaries: alwaysone mode counts entries for Double") {
+    val summaryFactory = new DoubleSummaryFactory(DoubleSummary.Mode.Sum)
+    val updateSketch = new UpdatableSketchBuilder[java.lang.Double, 
DoubleSummary](summaryFactory)
+      .build()
+
+    updateSketch.update("test1", 5.0)
+    updateSketch.update("test2", 2.0)
+    updateSketch.update("test3", 8.0)
+
+    val compactSketch = updateSketch.compact()
+    val result = ThetaSketchUtils.aggregateNumericSummaries[DoubleSummary, 
Double](
+      compactSketch.iterator(),
+      "alwaysone",
+      it => it.getSummary.getValue)
+
+    assert(result == 3.0)
+  }
+
+  test("aggregateNumericSummaries: sum mode aggregates correctly for Long") {

Review Comment:
   I added more coverage for the all null inputs and null summaries in the .sql 
file. For the rest the tests exist I believe. For Intersection, lgNomEntries is 
not configurable so I believe we cannot do that. Please let me know if I missed 
something 🫡



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to