dtenedor commented on code in PR #54338:
URL: https://github.com/apache/spark/pull/54338#discussion_r2823742380
##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -3904,6 +3904,213 @@ class DataFrameAggregateSuite extends QueryTest
assert(estimate == 1.0)
}
+ test("SPARK-55558: tuple_difference_theta_double basic functionality") {
+ val df1 = Seq((1, 1.5), (2, 2.5), (3, 3.5), (5, 5.5)).toDF("key",
"summary")
+ val df2 = Seq(1, 2, 4).toDF("value")
+
+ val tupleSketchDf = df1.agg(tuple_sketch_agg_double($"key",
$"summary").alias("tuple_sketch"))
+ val thetaSketchDf =
df2.agg(theta_sketch_agg($"value").alias("theta_sketch"))
+
+ val joined = tupleSketchDf.crossJoin(thetaSketchDf)
+
+ // Test difference (keys in tuple_sketch but not in theta_sketch: 3 and 5)
+ val difference = joined
+ .select(tuple_difference_theta_double($"tuple_sketch", $"theta_sketch"))
+ .collect()(0)(0)
+ assert(difference != null)
+ assert(difference.asInstanceOf[Array[Byte]].length > 0)
+
+ // Test with column names
+ val difference2 = joined
+ .select(tuple_difference_theta_double("tuple_sketch", "theta_sketch"))
+ .collect()(0)(0)
+ assert(difference2 != null)
+
+ // Verify estimate from difference
+ val estimate = joined
+ .select(tuple_sketch_estimate_double(
+ tuple_difference_theta_double($"tuple_sketch", $"theta_sketch")))
+ .collect()(0)(0)
+ assert(estimate == 2.0)
Review Comment:
We're checking the cardinality estimates here; can we also check the summary
values for correctness?
##########
sql/core/src/test/resources/sql-tests/inputs/tuplesketch.sql:
##########
@@ -1013,6 +1146,46 @@ SELECT tuple_sketch_estimate_integer(
lgNomEntries => 14))
FROM t_int_int_1_5_through_7_11;
+-- Test tuple_union_theta_double with named parameters - only required params
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double with named parameters - setting only
lgNomEntries
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => theta_sketch_agg(key2),
+ lgNomEntries => 14))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double with named parameters - setting only mode
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ first => tuple_sketch_agg_double(key1, val1),
+ second => theta_sketch_agg(key2),
+ mode => 'max'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double with named parameters - different order
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ mode => 'min',
+ lgNomEntries => 15,
+ second => theta_sketch_agg(key2),
+ first => tuple_sketch_agg_double(key1, val1)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_integer with named parameters - setting only
lgNomEntries
Review Comment:
Other test ideas:
What happens when one or both inputs are empty sketches? e.g.:
* Union of a non-empty TupleSketch with an empty ThetaSketch
* Intersection of a TupleSketch with an empty ThetaSketch (should return
empty)
* Difference of a TupleSketch from an empty ThetaSketch (should return the
TupleSketch)
Can we exercise null inputs for one or both arguments?
Can we validate that after intersection, the summaries should be preserved
from the tuple sketch side (since theta entries get identity defaults)? After
difference, remaining entries should retain their original summaries.
##########
sql/core/src/test/resources/sql-tests/inputs/tuplesketch.sql:
##########
@@ -371,6 +371,139 @@ SELECT tuple_sketch_estimate_integer(
tuple_sketch_agg_integer(key2, val2, 12, 'sum')))
FROM t_int_int_1_5_through_7_11;
+-- Test tuple_union_theta_double function with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double function with LongType key sketches and
explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1, 15),
+ theta_sketch_agg(key2), 15))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_double function with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_union_theta_double function with StringType key sketches and
explicit lgNomEntries parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2), 14))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_union_theta_double with lgNomEntries and mode parameters
+SELECT tuple_sketch_estimate_double(
+ tuple_union_theta_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'sum'),
+ theta_sketch_agg(key2), 12, 'sum'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_union_theta_integer function with IntegerType key sketches
+SELECT tuple_sketch_estimate_integer(
+ tuple_union_theta_integer(
+ tuple_sketch_agg_integer(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_union_theta_integer with lgNomEntries and mode parameters
+SELECT tuple_sketch_estimate_integer(
+ tuple_union_theta_integer(
+ tuple_sketch_agg_integer(key1, val1, 12, 'sum'),
+ theta_sketch_agg(key2), 12, 'sum'))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_double function with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_double function with LongType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1, 5),
+ theta_sketch_agg(key2)))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_double function with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_intersection_theta_double function with StringType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_intersection_theta_double with mode parameter
+SELECT tuple_sketch_estimate_double(
+ tuple_intersection_theta_double(
+ tuple_sketch_agg_double(key1, val1, 12, 'min'),
+ theta_sketch_agg(key2), 'min'))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_integer function with IntegerType key sketches
+SELECT tuple_sketch_estimate_integer(
+ tuple_intersection_theta_integer(
+ tuple_sketch_agg_integer(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_intersection_theta_integer with mode parameter
+SELECT tuple_sketch_estimate_integer(
+ tuple_intersection_theta_integer(
+ tuple_sketch_agg_integer(key1, val1, 12, 'sum'),
+ theta_sketch_agg(key2), 'sum'))
+FROM t_int_int_1_5_through_7_11;
+
+-- Test tuple_difference_theta_double function with IntegerType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_int_double_1_5_through_7_11;
+
+-- Test tuple_difference_theta_double function with LongType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_long_double_1_5_through_7_11;
+
+-- Test tuple_difference_theta_double function with DoubleType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_double_double_1_1_1_4_through_1_5_1_8;
+
+-- Test tuple_difference_theta_double function with StringType key sketches
+SELECT tuple_sketch_estimate_double(
+ tuple_difference_theta_double(
+ tuple_sketch_agg_double(key1, val1),
+ theta_sketch_agg(key2)))
+FROM t_string_double_a_d_through_e_h;
+
+-- Test tuple_difference_theta_integer function with IntegerType key sketches
+SELECT tuple_sketch_estimate_integer(
+ tuple_difference_theta_integer(
+ tuple_sketch_agg_integer(key1, val1, 12, 'sum'),
Review Comment:
Can we also test other supported aggregation modes: "max", "min",
"alwaysone"?
##########
docs/sql-ref-sketch-aggregates.md:
##########
@@ -748,6 +751,138 @@ FROM VALUES (1, 10.0, 4, 40.0), (2, 20.0, 4, 40.0), (3,
30.0, 5, 50.0), (4, 40.0
---
+### tuple_union_theta_*
+
+Merges a Tuple sketch with a Theta sketch using union (scalar function). This
combines distinct keys from both sketches, with Theta sketch entries assigned a
default summary value.
Review Comment:
The default values are identity elements for each mode (0.0 for sum, +Inf
for min, -Inf for max, 1.0 for alwaysone). This is correct but non-obvious. Can
these docs and the `@ExpressionDescription` explicitly state what default value
Theta entries receive for each mode?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/tupleDifference.scala:
##########
@@ -97,6 +97,78 @@ case class TupleDifferenceInteger(left: Expression, right:
Expression)
}
}
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(tupleSketch, thetaSketch) - Subtracts the binary representation of a
+ Datasketches ThetaSketch from a TupleSketch with double summary data type
using a TupleSketch
+ AnotB object. Returns elements in the TupleSketch that are not in the
ThetaSketch. """,
+ examples = """
+ Examples:
+ > SELECT
tuple_sketch_estimate_double(_FUNC_(tuple_sketch_agg_double(col1, val1),
theta_sketch_agg(col2))) FROM VALUES (5, 5.0D, 4), (1, 1.0D, 4), (2, 2.0D, 5),
(3, 3.0D, 1) tab(col1, val1, col2);
+ 2.0
+ """,
+ group = "sketch_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class TupleDifferenceThetaDouble(left: Expression, right: Expression)
Review Comment:
If a user passes these two binaries in the wrong order (theta first, tuple
second), they'll get a confusing deserialization error rather than a clear type
mismatch. This is an inherent limitation of the binary type approach but could
be mitigated with a better error message. Is there any way we could exercise
this in testing and possibly catch a specific Datasketches exception and
convert that to a Spark error class that helps the user understand?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]