cboumalh commented on code in PR #51298:
URL: https://github.com/apache/spark/pull/51298#discussion_r2331247613


##########
python/pyspark/sql/functions/builtin.py:
##########
@@ -25704,6 +25705,385 @@ def hll_union(
         return _invoke_function("hll_union", _to_java_column(col1), 
_to_java_column(col2))
 
 
+@_try_remote_functions
+def theta_sketch_agg(
+    col: "ColumnOrName",
+    lgNomEntries: Optional[Union[int, Column]] = None,
+) -> Column:
+    """
+    Aggregate function: returns the compact binary representation of the 
Datasketches
+    ThetaSketch configured with lgNomEntries arg.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+    lgNomEntries : :class:`~pyspark.sql.Column` or int, optional
+        The log-base-2 of nominal entries, where nominal entries is the size 
of the sketch
+        (must be between 4 and 26, defaults to 12)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The binary representation of the ThetaSketch.
+
+    See Also
+    --------
+    :meth:`pyspark.sql.functions.theta_union`
+    :meth:`pyspark.sql.functions.theta_intersection`
+    :meth:`pyspark.sql.functions.theta_difference`
+    :meth:`pyspark.sql.functions.theta_union_agg`
+    :meth:`pyspark.sql.functions.theta_intersection_agg`
+    :meth:`pyspark.sql.functions.theta_sketch_estimate`
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame([1,2,2,3], "INT")
+    >>> df.agg(sf.theta_sketch_estimate(sf.theta_sketch_agg("value"))).show()
+    +--------------------------------------------------+
+    |theta_sketch_estimate(theta_sketch_agg(value, 12))|
+    +--------------------------------------------------+
+    |                                                 3|
+    +--------------------------------------------------+
+
+    >>> df.agg(sf.theta_sketch_estimate(sf.theta_sketch_agg("value", 
15))).show()
+    +--------------------------------------------------+
+    |theta_sketch_estimate(theta_sketch_agg(value, 15))|
+    +--------------------------------------------------+
+    |                                                 3|
+    +--------------------------------------------------+
+    """
+    fn = "theta_sketch_agg"
+    if lgNomEntries is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(lgNomEntries))
+
+
+@_try_remote_functions
+def theta_union_agg(
+    col: "ColumnOrName",
+    lgNomEntries: Optional[Union[int, Column]] = None,
+) -> Column:
+    """
+    Aggregate function: returns the compact binary representation of the 
Datasketches
+    ThetaSketch, generated by merging previously created Datasketches 
ThetaSketch instances
+    via a Datasketches Union instance.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+    lgNomEntries : :class:`~pyspark.sql.Column` or int, optional
+        The log-base-2 of nominal entries for the union operation
+        (must be between 4 and 26, defaults to 12)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The binary representation of the merged ThetaSketch.
+
+    See Also
+    --------
+    :meth:`pyspark.sql.functions.theta_union`
+    :meth:`pyspark.sql.functions.theta_sketch_agg`
+    :meth:`pyspark.sql.functions.theta_sketch_estimate`
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df1 = spark.createDataFrame([1,2,2,3], "INT")
+    >>> df1 = df1.agg(sf.theta_sketch_agg("value").alias("sketch"))
+    >>> df2 = spark.createDataFrame([4,5,5,6], "INT")
+    >>> df2 = df2.agg(sf.theta_sketch_agg("value").alias("sketch"))
+    >>> df3 = df1.union(df2)
+    >>> df3.agg(sf.theta_sketch_estimate(sf.theta_union_agg("sketch"))).show()
+    +--------------------------------------------------+
+    |theta_sketch_estimate(theta_union_agg(sketch, 12))|
+    +--------------------------------------------------+
+    |                                                 6|
+    +--------------------------------------------------+
+    """
+    fn = "theta_union_agg"
+    if lgNomEntries is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(lgNomEntries))
+
+
+@_try_remote_functions
+def theta_intersection_agg(
+    col: "ColumnOrName",
+    lgNomEntries: Optional[Union[int, Column]] = None,
+) -> Column:
+    """
+    Aggregate function: returns the compact binary representation of the 
Datasketches
+    ThetaSketch, generated by intersecting previously created Datasketches 
ThetaSketch
+    instances via a Datasketches Intersection instance.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+    lgNomEntries : :class:`~pyspark.sql.Column` or int, optional
+        The log-base-2 of nominal entries for the intersection operation
+        (must be between 4 and 26, defaults to 12)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The binary representation of the intersected ThetaSketch.
+
+    See Also
+    --------
+    :meth:`pyspark.sql.functions.theta_intersection`
+    :meth:`pyspark.sql.functions.theta_sketch_agg`
+    :meth:`pyspark.sql.functions.theta_sketch_estimate`
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df1 = spark.createDataFrame([1,2,2,3], "INT")
+    >>> df1 = df1.agg(sf.theta_sketch_agg("value").alias("sketch"))
+    >>> df2 = spark.createDataFrame([2,3,3,4], "INT")
+    >>> df2 = df2.agg(sf.theta_sketch_agg("value").alias("sketch"))
+    >>> df3 = df1.union(df2)
+    >>> 
df3.agg(sf.theta_sketch_estimate(sf.theta_intersection_agg("sketch"))).show()
+    +---------------------------------------------------------+
+    |theta_sketch_estimate(theta_intersection_agg(sketch, 12))|
+    +---------------------------------------------------------+
+    |                                                        2|
+    +---------------------------------------------------------+
+    """
+    fn = "theta_intersection_agg"
+    if lgNomEntries is None:
+        return _invoke_function_over_columns(fn, col)
+    else:
+        return _invoke_function_over_columns(fn, col, lit(lgNomEntries))
+
+
+@_try_remote_functions
+def theta_sketch_estimate(col: "ColumnOrName") -> Column:
+    """
+    Returns the estimated number of unique values given the binary 
representation
+    of a Datasketches ThetaSketch.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The estimated number of unique values for the ThetaSketch.
+
+    See Also
+    --------
+    :meth:`pyspark.sql.functions.theta_union`
+    :meth:`pyspark.sql.functions.theta_intersection`
+    :meth:`pyspark.sql.functions.theta_difference`
+    :meth:`pyspark.sql.functions.theta_union_agg`
+    :meth:`pyspark.sql.functions.theta_intersection_agg`
+    :meth:`pyspark.sql.functions.theta_sketch_agg`
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame([1,2,2,3], "INT")
+    >>> df.agg(sf.theta_sketch_estimate(sf.theta_sketch_agg("value"))).show()
+    +--------------------------------------------------+
+    |theta_sketch_estimate(theta_sketch_agg(value, 12))|
+    +--------------------------------------------------+
+    |                                                 3|
+    +--------------------------------------------------+
+    """
+    from pyspark.sql.classic.column import _to_java_column
+
+    fn = "theta_sketch_estimate"
+    return _invoke_function(fn, _to_java_column(col))
+
+
+@_try_remote_functions
+def theta_union(
+    col1: "ColumnOrName", col2: "ColumnOrName", lgNomEntries: Optional[int] = 
None
+) -> Column:
+    """
+    Merges two binary representations of Datasketches ThetaSketch objects, 
using a
+    Datasketches Union object.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    col1 : :class:`~pyspark.sql.Column` or column name
+    col2 : :class:`~pyspark.sql.Column` or column name
+    lgNomEntries : int, optional
+        The log-base-2 of nominal entries for the union operation
+        (must be between 4 and 26, defaults to 12)
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        The binary representation of the merged ThetaSketch.
+
+    See Also
+    --------
+    :meth:`pyspark.sql.functions.theta_union_agg`
+    :meth:`pyspark.sql.functions.theta_sketch_agg`
+    :meth:`pyspark.sql.functions.theta_sketch_estimate`
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame([(1,4),(2,5),(2,5),(3,6)], 
"struct<v1:int,v2:int>")
+    >>> df = df.agg(
+    ...     sf.theta_sketch_agg("v1").alias("sketch1"),
+    ...     sf.theta_sketch_agg("v2").alias("sketch2")
+    ... )
+    >>> df.select(sf.theta_sketch_estimate(sf.theta_union(df.sketch1, 
"sketch2"))).show()
+    +--------------------------------------------------------+
+    |theta_sketch_estimate(theta_union(sketch1, sketch2, 12))|
+    +--------------------------------------------------------+
+    |                                                       6|
+    +--------------------------------------------------------+
+    """
+    from pyspark.sql.classic.column import _to_java_column
+
+    fn = "theta_union"
+    if lgNomEntries is not None:
+        return _invoke_function(
+            fn,
+            _to_java_column(col1),
+            _to_java_column(col2),
+            _enum_to_value(lgNomEntries),

Review Comment:
   Thanks for pointing this out. The template for this function was copied from 
the HLL implementation and forget to change these ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to