This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8355d8b7c76 [SPARK-41363][CONNECT][PYTHON] Implement normal functions
8355d8b7c76 is described below

commit 8355d8b7c768bc2728501849e21ac34f44955fd5
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Mon Dec 5 09:50:23 2022 +0900

    [SPARK-41363][CONNECT][PYTHON] Implement normal functions
    
    ### What changes were proposed in this pull request?
    Implement [the normal 
functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#normal-functions),
 except:
    
    - `bitwiseNOT` - deprecated since 3.2.0
    - `broadcast` - depend on `DataFrame.hint` be added first
    - `when` - depend on `CaseWhen` in proto message and `Column.{when, 
otherwise}`
    
    ### Why are the changes needed?
    for API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    new APIs
    
    ### How was this patch tested?
    added UT
    
    Closes #38884 from zhengruifeng/connect_function_normal.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/functions.py            | 505 ++++++++++++++++++++-
 .../sql/tests/connect/test_connect_function.py     |  83 ++++
 2 files changed, 587 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index 76991fca2bc..9b1163a1bdf 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -20,9 +20,10 @@ from pyspark.sql.connect.column import (
     LiteralExpression,
     ColumnReference,
     UnresolvedFunction,
+    SQLExpression,
 )
 
-from typing import Any, TYPE_CHECKING, Union, List
+from typing import Any, TYPE_CHECKING, Union, List, Optional
 
 if TYPE_CHECKING:
     from pyspark.sql.connect._typing import ColumnOrName
@@ -78,6 +79,9 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: 
Any) -> Column:
     return _invoke_function(name, *_cols)
 
 
+# Normal Functions
+
+
 def col(col: str) -> Column:
     return Column(ColumnReference(col))
 
@@ -89,6 +93,505 @@ def lit(col: Any) -> Column:
     return Column(LiteralExpression(col))
 
 
+# def bitwiseNOT(col: "ColumnOrName") -> Column:
+#     """
+#     Computes bitwise not.
+#
+#     .. versionadded:: 1.4.0
+#
+#     .. deprecated:: 3.2.0
+#         Use :func:`bitwise_not` instead.
+#     """
+#     warnings.warn("Deprecated in 3.2, use bitwise_not instead.", 
FutureWarning)
+#     return bitwise_not(col)
+
+
+def bitwise_not(col: "ColumnOrName") -> Column:
+    """
+    Computes bitwise not.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+
+    Examples
+    --------
+    >>> df = spark.range(1)
+    >>> df.select(bitwise_not(lit(0))).show()
+    +---+
+    | ~0|
+    +---+
+    | -1|
+    +---+
+    >>> df.select(bitwise_not(lit(1))).show()
+    +---+
+    | ~1|
+    +---+
+    | -2|
+    +---+
+    """
+    return _invoke_function_over_columns("~", col)
+
+
+# TODO(SPARK-41364): support broadcast
+# def broadcast(df: DataFrame) -> DataFrame:
+#     """
+#     Marks a DataFrame as small enough for use in broadcast joins.
+#
+#     .. versionadded:: 1.6.0
+#
+#     Returns
+#     -------
+#     :class:`~pyspark.sql.DataFrame`
+#         DataFrame marked as ready for broadcast join.
+#
+#     Examples
+#     --------
+#     >>> from pyspark.sql import types
+#     >>> df = spark.createDataFrame([1, 2, 3, 3, 4], types.IntegerType())
+#     >>> df_small = spark.range(3)
+#     >>> df_b = broadcast(df_small)
+#     >>> df.join(df_b, df.value == df_small.id).show()
+#     +-----+---+
+#     |value| id|
+#     +-----+---+
+#     |    1|  1|
+#     |    2|  2|
+#     +-----+---+
+#     """
+#
+#     sc = SparkContext._active_spark_context
+#     assert sc is not None and sc._jvm is not None
+#     return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sparkSession)
+
+
+def coalesce(*cols: "ColumnOrName") -> Column:
+    """Returns the first column that is not null.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        list of columns to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        value of the first column that is not null.
+
+    Examples
+    --------
+    >>> cDf = spark.createDataFrame([(None, None), (1, None), (None, 2)], 
("a", "b"))
+    >>> cDf.show()
+    +----+----+
+    |   a|   b|
+    +----+----+
+    |null|null|
+    |   1|null|
+    |null|   2|
+    +----+----+
+
+    >>> cDf.select(coalesce(cDf["a"], cDf["b"])).show()
+    +--------------+
+    |coalesce(a, b)|
+    +--------------+
+    |          null|
+    |             1|
+    |             2|
+    +--------------+
+
+    >>> cDf.select('*', coalesce(cDf["a"], lit(0.0))).show()
+    +----+----+----------------+
+    |   a|   b|coalesce(a, 0.0)|
+    +----+----+----------------+
+    |null|null|             0.0|
+    |   1|null|             1.0|
+    |null|   2|             0.0|
+    +----+----+----------------+
+    """
+    return _invoke_function_over_columns("coalesce", *cols)
+
+
+def expr(str: str) -> Column:
+    """Parses the expression string into the column that it represents
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    str : str
+        expression defined in string.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        column representing the expression.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([["Alice"], ["Bob"]], ["name"])
+    >>> df.select("name", expr("length(name)")).show()
+    +-----+------------+
+    | name|length(name)|
+    +-----+------------+
+    |Alice|           5|
+    |  Bob|           3|
+    +-----+------------+
+    """
+    return Column(SQLExpression(str))
+
+
+def greatest(*cols: "ColumnOrName") -> Column:
+    """
+    Returns the greatest value of the list of column names, skipping null 
values.
+    This function takes at least 2 parameters. It will return null if all 
parameters are null.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        columns to check for gratest value.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        gratest value.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
+    >>> df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect()
+    [Row(greatest=4)]
+    """
+    if len(cols) < 2:
+        raise ValueError("greatest should take at least two columns")
+    return _invoke_function_over_columns("greatest", *cols)
+
+
+def input_file_name() -> Column:
+    """
+    Creates a string column for the file name of the current Spark task.
+
+    .. versionadded:: 3.4.0
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        file names.
+
+    Examples
+    --------
+    >>> import os
+    >>> path = os.path.abspath(__file__)
+    >>> df = spark.read.text(path)
+    >>> df.select(input_file_name()).first()
+    Row(input_file_name()='file:///...')
+    """
+    return _invoke_function("input_file_name")
+
+
+def least(*cols: "ColumnOrName") -> Column:
+    """
+    Returns the least value of the list of column names, skipping null values.
+    This function takes at least 2 parameters. It will return null if all 
parameters are null.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        column names or columns to be compared
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        least value.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
+    >>> df.select(least(df.a, df.b, df.c).alias("least")).collect()
+    [Row(least=1)]
+    """
+    if len(cols) < 2:
+        raise ValueError("least should take at least two columns")
+    return _invoke_function_over_columns("least", *cols)
+
+
+def isnan(col: "ColumnOrName") -> Column:
+    """An expression that returns true if the column is NaN.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        True if value is NaN and False otherwise.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], 
("a", "b"))
+    >>> df.select("a", "b", isnan("a").alias("r1"), 
isnan(df.b).alias("r2")).show()
+    +---+---+-----+-----+
+    |  a|  b|   r1|   r2|
+    +---+---+-----+-----+
+    |1.0|NaN|false| true|
+    |NaN|2.0| true|false|
+    +---+---+-----+-----+
+    """
+    return _invoke_function_over_columns("isnan", col)
+
+
+def isnull(col: "ColumnOrName") -> Column:
+    """An expression that returns true if the column is null.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        True if value is null and False otherwise.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))
+    >>> df.select("a", "b", isnull("a").alias("r1"), 
isnull(df.b).alias("r2")).show()
+    +----+----+-----+-----+
+    |   a|   b|   r1|   r2|
+    +----+----+-----+-----+
+    |   1|null|false| true|
+    |null|   2| true|false|
+    +----+----+-----+-----+
+    """
+    return _invoke_function_over_columns("isnull", col)
+
+
+def monotonically_increasing_id() -> Column:
+    """A column that generates monotonically increasing 64-bit integers.
+
+    The generated ID is guaranteed to be monotonically increasing and unique, 
but not consecutive.
+    The current implementation puts the partition ID in the upper 31 bits, and 
the record number
+    within each partition in the lower 33 bits. The assumption is that the 
data frame has
+    less than 1 billion partitions, and each partition has less than 8 billion 
records.
+
+    .. versionadded:: 3.4.0
+
+    Notes
+    -----
+    The function is non-deterministic because its result depends on partition 
IDs.
+
+    As an example, consider a :class:`DataFrame` with two partitions, each 
with 3 records.
+    This expression would return the following IDs:
+    0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        last value of the group.
+
+    Examples
+    --------
+    >>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), 
(3,)]).toDF(['col1'])
+    >>> df0.select(monotonically_increasing_id().alias('id')).collect()
+    [Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), 
Row(id=8589934594)]
+    """
+    return _invoke_function("monotonically_increasing_id")
+
+
+def nanvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
+    """Returns col1 if it is not NaN, or col2 if col1 is NaN.
+
+    Both inputs should be floating point columns (:class:`DoubleType` or 
:class:`FloatType`).
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col1 : :class:`~pyspark.sql.Column` or str
+        first column to check.
+    col2 : :class:`~pyspark.sql.Column` or str
+        second column to return if first is NaN.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        value from first column or second if first is NaN .
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], 
("a", "b"))
+    >>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, 
df.b).alias("r2")).collect()
+    [Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
+    """
+    return _invoke_function_over_columns("nanvl", col1, col2)
+
+
+def rand(seed: Optional[int] = None) -> Column:
+    """Generates a random column with independent and identically distributed 
(i.i.d.) samples
+    uniformly distributed in [0.0, 1.0).
+
+    .. versionadded:: 3.4.0
+
+    Notes
+    -----
+    The function is non-deterministic in general case.
+
+    Parameters
+    ----------
+    seed : int (default: None)
+        seed value for random generator.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        random values.
+
+    Examples
+    --------
+    >>> df = spark.range(2)
+    >>> df.withColumn('rand', rand(seed=42) * 3).show() # doctest: +SKIP
+    +---+------------------+
+    | id|              rand|
+    +---+------------------+
+    |  0|1.4385751892400076|
+    |  1|1.7082186019706387|
+    +---+------------------+
+    """
+    if seed is not None:
+        return _invoke_function("rand", lit(seed))
+    else:
+        return _invoke_function("rand")
+
+
+def randn(seed: Optional[int] = None) -> Column:
+    """Generates a column with independent and identically distributed 
(i.i.d.) samples from
+    the standard normal distribution.
+
+    .. versionadded:: 3.4.0
+
+    Notes
+    -----
+    The function is non-deterministic in general case.
+
+    Parameters
+    ----------
+    seed : int (default: None)
+        seed value for random generator.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        random values.
+
+    Examples
+    --------
+    >>> df = spark.range(2)
+    >>> df.withColumn('randn', randn(seed=42)).show() # doctest: +SKIP
+    +---+--------------------+
+    | id|               randn|
+    +---+--------------------+
+    |  0|-0.04167221574820542|
+    |  1| 0.15241403986452778|
+    +---+--------------------+
+    """
+    if seed is not None:
+        return _invoke_function("randn", lit(seed))
+    else:
+        return _invoke_function("randn")
+
+
+def spark_partition_id() -> Column:
+    """A column for partition ID.
+
+    .. versionadded:: 3.4.0
+
+    Notes
+    -----
+    This is non deterministic because it depends on data partitioning and task 
scheduling.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        partition id the record belongs to.
+
+    Examples
+    --------
+    >>> df = spark.range(2)
+    >>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
+    [Row(pid=0), Row(pid=0)]
+    """
+    return _invoke_function("spark_partition_id")
+
+
+# TODO(SPARK-41319): Support case-when in Column
+# def when(condition: Column, value: Any) -> Column:
+#     """Evaluates a list of conditions and returns one of multiple possible 
result expressions.
+#     If :func:`pyspark.sql.Column.otherwise` is not invoked, None is returned 
for unmatched
+#     conditions.
+#
+#     .. versionadded:: 3.4.0
+#
+#     Parameters
+#     ----------
+#     condition : :class:`~pyspark.sql.Column`
+#         a boolean :class:`~pyspark.sql.Column` expression.
+#     value :
+#         a literal value, or a :class:`~pyspark.sql.Column` expression.
+#
+#     Returns
+#     -------
+#     :class:`~pyspark.sql.Column`
+#         column representing when expression.
+#
+#     Examples
+#     --------
+#     >>> df = spark.range(3)
+#     >>> df.select(when(df['id'] == 2, 3).otherwise(4).alias("age")).show()
+#     +---+
+#     |age|
+#     +---+
+#     |  4|
+#     |  4|
+#     |  3|
+#     +---+
+#
+#     >>> df.select(when(df.id == 2, df.id + 1).alias("age")).show()
+#     +----+
+#     | age|
+#     +----+
+#     |null|
+#     |null|
+#     |   3|
+#     +----+
+#     """
+#     # Explicitly not using ColumnOrName type here to make reading condition 
less opaque
+#     if not isinstance(condition, Column):
+#         raise TypeError("condition should be a Column")
+#     v = value._jc if isinstance(value, Column) else value
+#
+#     return _invoke_function("when", condition._jc, v)
+
+
 # Sort Functions
 
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index a25f2b2ccfa..f2c9b188985 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -97,6 +97,89 @@ class SparkConnectFunctionTests(SparkConnectSQLTestCase):
     """These test cases exercise the interface to the proto plan
     generation but do not call Spark."""
 
+    def test_normal_functions(self):
+        from pyspark.sql import functions as SF
+        from pyspark.sql.connect import functions as CF
+
+        query = """
+            SELECT * FROM VALUES
+            (0, float("NAN"), NULL), (1, NULL, 2.0), (2, 2.1, 3.5)
+            AS tab(a, b, c)
+            """
+        # +---+----+----+
+        # |  a|   b|   c|
+        # +---+----+----+
+        # |  0| NaN|null|
+        # |  1|null| 2.0|
+        # |  2| 2.1| 3.5|
+        # +---+----+----+
+
+        cdf = self.connect.sql(query)
+        sdf = self.spark.sql(query)
+
+        self.assert_eq(
+            cdf.select(CF.bitwise_not(cdf.a)).toPandas(),
+            sdf.select(SF.bitwise_not(sdf.a)).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.coalesce(cdf.a, "b", cdf.c)).toPandas(),
+            sdf.select(SF.coalesce(sdf.a, "b", sdf.c)).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.expr("a + b - c")).toPandas(),
+            sdf.select(SF.expr("a + b - c")).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.greatest(cdf.a, "b", cdf.c)).toPandas(),
+            sdf.select(SF.greatest(sdf.a, "b", sdf.c)).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.isnan(cdf.a), CF.isnan("b")).toPandas(),
+            sdf.select(SF.isnan(sdf.a), SF.isnan("b")).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.isnull(cdf.a), CF.isnull("b")).toPandas(),
+            sdf.select(SF.isnull(sdf.a), SF.isnull("b")).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.input_file_name()).toPandas(),
+            sdf.select(SF.input_file_name()).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.least(cdf.a, "b", cdf.c)).toPandas(),
+            sdf.select(SF.least(sdf.a, "b", sdf.c)).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.monotonically_increasing_id()).toPandas(),
+            sdf.select(SF.monotonically_increasing_id()).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.nanvl("b", cdf.c)).toPandas(),
+            sdf.select(SF.nanvl("b", sdf.c)).toPandas(),
+        )
+        # Can not compare the values due to the random seed
+        self.assertEqual(
+            cdf.select(CF.rand()).count(),
+            sdf.select(SF.rand()).count(),
+        )
+        self.assert_eq(
+            cdf.select(CF.rand(100)).toPandas(),
+            sdf.select(SF.rand(100)).toPandas(),
+        )
+        # Can not compare the values due to the random seed
+        self.assertEqual(
+            cdf.select(CF.randn()).count(),
+            sdf.select(SF.randn()).count(),
+        )
+        self.assert_eq(
+            cdf.select(CF.randn(100)).toPandas(),
+            sdf.select(SF.randn(100)).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(CF.spark_partition_id()).toPandas(),
+            sdf.select(SF.spark_partition_id()).toPandas(),
+        )
+
     def test_sorting_functions_with_column(self):
         from pyspark.sql.connect import functions as CF
         from pyspark.sql.connect.column import Column


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to