dtenedor opened a new pull request, #52900:
URL: https://github.com/apache/spark/pull/52900

   ### What changes were proposed in this pull request?
   
   This PR adds DataFrame API support for the KLL quantile sketch functions 
that were previously added to Spark SQL in 
https://github.com/apache/spark/pull/52800. This lets users leverage KLL 
sketches through both Scala and Python DataFrame APIs in addition to the 
existing SQL interface.
   
   **Key additions:**
   
   1. **Scala DataFrame API** 
(`sql/api/src/main/scala/org/apache/spark/sql/functions.scala`):
      - 18 new functions covering aggregate, merge, quantile, and rank 
operations
      - Multiple overloads for each function supporting:
        - `Column` parameters for computed values
        - `String` parameters for column names
        - `Int` parameters for literal k values
        - Optional k parameters with sensible defaults
      - Functions for all three data type variants: bigint, float, double
   
   2. **Python DataFrame API** (`python/pyspark/sql/functions/builtin.py`):
      - 18 corresponding Python functions with:
        - Comprehensive docstrings with usage examples
        - Proper type hints (`ColumnOrName`, `Optional[Union[int, Column]]`)
        - Support for both column objects and column name strings
      - Added to PySpark documentation reference
   
   3. **Python Spark Connect Support** 
(`python/pyspark/sql/connect/functions/builtin.py`):
      - Full compatibility with Spark Connect architecture
      - All 18 functions properly registered
   
   ### Why are the changes needed?
   
   While the SQL API for KLL sketches was previously added, DataFrame API 
support is essential for:
   - **Programmatic workflows**: Users building Spark applications in 
Scala/Python need native API access
   - **Type safety**: DataFrame API provides compile-time type checking in Scala
   - **Composability**: Easier to chain operations and integrate with existing 
DataFrame pipelines
   - **Consistency**: All other sketch functions (HLL, Theta) provide DataFrame 
APIs
   - **IDE support**: Better autocomplete and inline documentation for 
developers
   
   Without DataFrame API support, users would be forced to use SQL expressions 
via `expr()` or `selectExpr()`, which is less ergonomic and type-safe.
   
   ### Does this PR introduce any user-facing change?
   
   Yes, this PR adds DataFrame API support for the 18 KLL sketch functions:
   
   **Scala DataFrame API Example:**
   ```scala
   import org.apache.spark.sql.functions._
   
   // Create sketch with default k
   val df = Seq(1, 2, 3, 4, 5).toDF("value")
   val sketch = df.agg(kll_sketch_agg_bigint($"value"))
   
   // Create sketch with custom k value
   val sketch2 = df.agg(kll_sketch_agg_bigint("value", 400))
   
   // Get median (0.5 quantile)
   val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
   val median = sketchDf.select(kll_sketch_get_quantile_bigint($"sketch", 
lit(0.5)))
   
   // Get multiple quantiles
   val quantiles = sketchDf.select(
     kll_sketch_get_quantile_bigint($"sketch", array(lit(0.25), lit(0.5), 
lit(0.75)))
   )
   
   // Merge sketches
   val merged = sketchDf.select(
     kll_sketch_merge_bigint($"sketch", $"sketch").alias("merged")
   )
   
   // Get count of items
   val count = sketchDf.select(kll_sketch_get_n_bigint($"sketch"))
   ```
   
   **Python DataFrame API Example:**
   ```python
   from pyspark.sql import functions as sf
   
   # Create sketch with default k
   df = spark.createDataFrame([1, 2, 3, 4, 5], "INT")
   sketch = df.agg(sf.kll_sketch_agg_bigint("value"))
   
   # Create sketch with custom k value
   sketch2 = df.agg(sf.kll_sketch_agg_bigint("value", 400))
   
   # Get median (0.5 quantile)
   sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
   median = sketch_df.select(sf.kll_sketch_get_quantile_bigint("sketch", 
sf.lit(0.5)))
   
   # Get multiple quantiles
   quantiles = sketch_df.select(
       sf.kll_sketch_get_quantile_bigint("sketch", sf.array(sf.lit(0.25), 
sf.lit(0.5), sf.lit(0.75)))
   )
   
   # Merge sketches
   merged = sketch_df.select(
       sf.kll_sketch_merge_bigint("sketch", "sketch").alias("merged")
   )
   
   # Get count of items
   count = sketch_df.select(sf.kll_sketch_get_n_bigint("sketch"))
   ```
   
   ### How was this patch tested?
   
   1. **Scala Unit Tests** (`DataFrameAggregateSuite`):
      - `kll_sketch_agg_{bigint,float,double}` with default and explicit k 
values
      - `kll_sketch_to_string` functions for all data types
      - `kll_sketch_get_n` functions for all data types
      - `kll_sketch_merge` operations
      - `kll_sketch_get_quantile` with single rank and array of ranks
      - `kll_sketch_get_rank` operations
      - Null value handling tests
      - **Result**: All 11 tests pass successfully
   
   2. **Python Unit Tests** (`test_functions.py`):
      - Comprehensive tests mirroring Scala tests
      - Tests for Column object and string column name overloads
      - Tests for optional k parameter
      - Array input tests for quantile/rank functions
      - Null handling validation
      - Type checking (bytes/bytearray for sketches, str for to_string, 
int/float for values)
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes, IDE assistance used `claude-4.5-sonnet` with manual validation and 
integration.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to