zhengruifeng commented on code in PR #49231:
URL: https://github.com/apache/spark/pull/49231#discussion_r1891552045


##########
python/pyspark/sql/functions/builtin.py:
##########
@@ -1851,6 +1851,157 @@ def sum_distinct(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("sum_distinct", col)
 
 
+@_try_remote_functions
+def listagg(col: "ColumnOrName", delimiter: Optional[Union[Column, str, 
bytes]] = None) -> Column:
+    """
+    Aggregate function: returns the concatenation of non-null input values,
+    separated by the delimiter.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        target column to compute on.
+    delimiter : :class:`~pyspark.sql.Column` or str, optional

Review Comment:
   ```suggestion
       delimiter : :class:`~pyspark.sql.Column`, literal string or bytes, 
optional
   ```



##########
python/pyspark/sql/functions/builtin.py:
##########
@@ -1851,6 +1851,157 @@ def sum_distinct(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("sum_distinct", col)
 
 
+@_try_remote_functions
+def listagg(col: "ColumnOrName", delimiter: Optional[Union[Column, str, 
bytes]] = None) -> Column:
+    """
+    Aggregate function: returns the concatenation of non-null input values,
+    separated by the delimiter.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        target column to compute on.
+    delimiter : :class:`~pyspark.sql.Column` or str, optional
+        the delimiter to separate the values. The default value is None.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+
+    Examples
+    --------
+    Example 1: Using listagg function
+
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame([('a',), ('b',), (None,), ('c',)], 
['strings'])
+    >>> df.select(sf.listagg('strings')).show()
+    +----------------------+
+    |listagg(strings, NULL)|
+    +----------------------+
+    |                   abc|
+    +----------------------+
+
+    Example 2: Using listagg function with a delimiter
+
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame([('a',), ('b',), (None,), ('c',)], 
['strings'])
+    >>> df.select(sf.listagg('strings', ', ')).show()
+    +--------------------+
+    |listagg(strings, , )|
+    +--------------------+
+    |             a, b, c|
+    +--------------------+
+
+    Example 3: Using listagg function with a binary column and delimiter
+
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame([(b'\x01',), (b'\x02',), (None,), 
(b'\x03',)], ['bytes'])
+    >>> df.select(sf.listagg('bytes', b'\x42')).show()
+    +---------------------+
+    |listagg(bytes, X'42')|
+    +---------------------+
+    |     [01 42 02 42 03]|
+    +---------------------+
+
+    Example 4: Using listagg function on a column with all None values
+
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import StructType, StructField, StringType
+    >>> schema = StructType([StructField("strings", StringType(), True)])
+    >>> df = spark.createDataFrame([(None,), (None,), (None,), (None,)], 
schema=schema)
+    >>> df.select(sf.listagg('strings')).show()
+    +----------------------+
+    |listagg(strings, NULL)|
+    +----------------------+
+    |                  NULL|
+    +----------------------+
+    """
+    if delimiter is None:
+        return _invoke_function_over_columns("listagg", col)
+    else:
+        return _invoke_function_over_columns("listagg", col, lit(delimiter))
+
+
+@_try_remote_functions
+def listagg_distinct(
+    col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None
+) -> Column:
+    """
+    Aggregate function: returns the concatenation of distinct non-null input 
values,
+    separated by the delimiter.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        target column to compute on.
+    delimiter : :class:`~pyspark.sql.Column` or str, optional

Review Comment:
   ```suggestion
       delimiter : :class:`~pyspark.sql.Column`, literal string or bytes, 
optional
   ```



##########
python/pyspark/sql/tests/test_functions.py:
##########
@@ -84,9 +85,6 @@ def test_function_parity(self):
 
         # Functions that we expect to be missing in python until they are 
added to pyspark
         expected_missing_in_py = {
-            # TODO(SPARK-50220): listagg functions will soon be added and 
removed from this list
-            "listagg_distinct",
-            "listagg",
             "string_agg",

Review Comment:
   do we plan to add the aliases `string_agg` and `string_agg_distinct`?



##########
python/pyspark/sql/connect/functions/builtin.py:
##########
@@ -1064,6 +1064,28 @@ def collect_set(col: "ColumnOrName") -> Column:
 collect_set.__doc__ = pysparkfuncs.collect_set.__doc__
 
 
+def listagg(col: "ColumnOrName", delimiter: Optional[Union[Column, str, 
bytes]] = None) -> Column:
+    if delimiter is None:
+        return _invoke_function_over_columns("listagg", col)
+    else:
+        return _invoke_function_over_columns("listagg", col, lit(delimiter))
+
+
+listagg.__doc__ = pysparkfuncs.listagg.__doc__
+
+
+def listagg_distinct(
+    col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None
+) -> Column:
+    if delimiter is None:
+        return _invoke_function_over_columns("listagg_distinct", col)

Review Comment:
   I suspect this won't work, since we don't have `listagg_distinct` in 
`FunctionRegistry`.
   
   In spark connect, `_invoke_function_over_columns` just build a unresolved 
function.



##########
python/pyspark/sql/connect/functions/builtin.py:
##########
@@ -1064,6 +1064,28 @@ def collect_set(col: "ColumnOrName") -> Column:
 collect_set.__doc__ = pysparkfuncs.collect_set.__doc__
 
 
+def listagg(col: "ColumnOrName", delimiter: Optional[Union[Column, str, 
bytes]] = None) -> Column:
+    if delimiter is None:
+        return _invoke_function_over_columns("listagg", col)
+    else:
+        return _invoke_function_over_columns("listagg", col, lit(delimiter))
+
+
+listagg.__doc__ = pysparkfuncs.listagg.__doc__
+
+
+def listagg_distinct(
+    col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None
+) -> Column:
+    if delimiter is None:
+        return _invoke_function_over_columns("listagg_distinct", col)

Review Comment:
   I think you can refer to 
https://github.com/apache/spark/blob/05750def2da6fe16fc0cf8a3eee79bb2056df979/python/pyspark/sql/connect/functions/builtin.py#L1090-L1096



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to