This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8355d8b7c76 [SPARK-41363][CONNECT][PYTHON] Implement normal functions 8355d8b7c76 is described below commit 8355d8b7c768bc2728501849e21ac34f44955fd5 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Dec 5 09:50:23 2022 +0900 [SPARK-41363][CONNECT][PYTHON] Implement normal functions ### What changes were proposed in this pull request? Implement [the normal functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#normal-functions), except: - `bitwiseNOT` - deprecated since 3.2.0 - `broadcast` - depend on `DataFrame.hint` be added first - `when` - depend on `CaseWhen` in proto message and `Column.{when, otherwise}` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? new APIs ### How was this patch tested? added UT Closes #38884 from zhengruifeng/connect_function_normal. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/functions.py | 505 ++++++++++++++++++++- .../sql/tests/connect/test_connect_function.py | 83 ++++ 2 files changed, 587 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index 76991fca2bc..9b1163a1bdf 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -20,9 +20,10 @@ from pyspark.sql.connect.column import ( LiteralExpression, ColumnReference, UnresolvedFunction, + SQLExpression, ) -from typing import Any, TYPE_CHECKING, Union, List +from typing import Any, TYPE_CHECKING, Union, List, Optional if TYPE_CHECKING: from pyspark.sql.connect._typing import ColumnOrName @@ -78,6 +79,9 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: Any) -> Column: return _invoke_function(name, *_cols) +# Normal Functions + + def col(col: str) -> Column: return Column(ColumnReference(col)) @@ -89,6 +93,505 @@ def lit(col: Any) -> Column: return Column(LiteralExpression(col)) +# def bitwiseNOT(col: "ColumnOrName") -> Column: +# """ +# Computes bitwise not. +# +# .. versionadded:: 1.4.0 +# +# .. deprecated:: 3.2.0 +# Use :func:`bitwise_not` instead. +# """ +# warnings.warn("Deprecated in 3.2, use bitwise_not instead.", FutureWarning) +# return bitwise_not(col) + + +def bitwise_not(col: "ColumnOrName") -> Column: + """ + Computes bitwise not. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to compute on. + + Returns + ------- + :class:`~pyspark.sql.Column` + the column for computed results. + + Examples + -------- + >>> df = spark.range(1) + >>> df.select(bitwise_not(lit(0))).show() + +---+ + | ~0| + +---+ + | -1| + +---+ + >>> df.select(bitwise_not(lit(1))).show() + +---+ + | ~1| + +---+ + | -2| + +---+ + """ + return _invoke_function_over_columns("~", col) + + +# TODO(SPARK-41364): support broadcast +# def broadcast(df: DataFrame) -> DataFrame: +# """ +# Marks a DataFrame as small enough for use in broadcast joins. +# +# .. versionadded:: 1.6.0 +# +# Returns +# ------- +# :class:`~pyspark.sql.DataFrame` +# DataFrame marked as ready for broadcast join. +# +# Examples +# -------- +# >>> from pyspark.sql import types +# >>> df = spark.createDataFrame([1, 2, 3, 3, 4], types.IntegerType()) +# >>> df_small = spark.range(3) +# >>> df_b = broadcast(df_small) +# >>> df.join(df_b, df.value == df_small.id).show() +# +-----+---+ +# |value| id| +# +-----+---+ +# | 1| 1| +# | 2| 2| +# +-----+---+ +# """ +# +# sc = SparkContext._active_spark_context +# assert sc is not None and sc._jvm is not None +# return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sparkSession) + + +def coalesce(*cols: "ColumnOrName") -> Column: + """Returns the first column that is not null. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + cols : :class:`~pyspark.sql.Column` or str + list of columns to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + value of the first column that is not null. + + Examples + -------- + >>> cDf = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b")) + >>> cDf.show() + +----+----+ + | a| b| + +----+----+ + |null|null| + | 1|null| + |null| 2| + +----+----+ + + >>> cDf.select(coalesce(cDf["a"], cDf["b"])).show() + +--------------+ + |coalesce(a, b)| + +--------------+ + | null| + | 1| + | 2| + +--------------+ + + >>> cDf.select('*', coalesce(cDf["a"], lit(0.0))).show() + +----+----+----------------+ + | a| b|coalesce(a, 0.0)| + +----+----+----------------+ + |null|null| 0.0| + | 1|null| 1.0| + |null| 2| 0.0| + +----+----+----------------+ + """ + return _invoke_function_over_columns("coalesce", *cols) + + +def expr(str: str) -> Column: + """Parses the expression string into the column that it represents + + .. versionadded:: 3.4.0 + + Parameters + ---------- + str : str + expression defined in string. + + Returns + ------- + :class:`~pyspark.sql.Column` + column representing the expression. + + Examples + -------- + >>> df = spark.createDataFrame([["Alice"], ["Bob"]], ["name"]) + >>> df.select("name", expr("length(name)")).show() + +-----+------------+ + | name|length(name)| + +-----+------------+ + |Alice| 5| + | Bob| 3| + +-----+------------+ + """ + return Column(SQLExpression(str)) + + +def greatest(*cols: "ColumnOrName") -> Column: + """ + Returns the greatest value of the list of column names, skipping null values. + This function takes at least 2 parameters. It will return null if all parameters are null. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + columns to check for gratest value. + + Returns + ------- + :class:`~pyspark.sql.Column` + gratest value. + + Examples + -------- + >>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) + >>> df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect() + [Row(greatest=4)] + """ + if len(cols) < 2: + raise ValueError("greatest should take at least two columns") + return _invoke_function_over_columns("greatest", *cols) + + +def input_file_name() -> Column: + """ + Creates a string column for the file name of the current Spark task. + + .. versionadded:: 3.4.0 + + Returns + ------- + :class:`~pyspark.sql.Column` + file names. + + Examples + -------- + >>> import os + >>> path = os.path.abspath(__file__) + >>> df = spark.read.text(path) + >>> df.select(input_file_name()).first() + Row(input_file_name()='file:///...') + """ + return _invoke_function("input_file_name") + + +def least(*cols: "ColumnOrName") -> Column: + """ + Returns the least value of the list of column names, skipping null values. + This function takes at least 2 parameters. It will return null if all parameters are null. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + cols : :class:`~pyspark.sql.Column` or str + column names or columns to be compared + + Returns + ------- + :class:`~pyspark.sql.Column` + least value. + + Examples + -------- + >>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) + >>> df.select(least(df.a, df.b, df.c).alias("least")).collect() + [Row(least=1)] + """ + if len(cols) < 2: + raise ValueError("least should take at least two columns") + return _invoke_function_over_columns("least", *cols) + + +def isnan(col: "ColumnOrName") -> Column: + """An expression that returns true if the column is NaN. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to compute on. + + Returns + ------- + :class:`~pyspark.sql.Column` + True if value is NaN and False otherwise. + + Examples + -------- + >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) + >>> df.select("a", "b", isnan("a").alias("r1"), isnan(df.b).alias("r2")).show() + +---+---+-----+-----+ + | a| b| r1| r2| + +---+---+-----+-----+ + |1.0|NaN|false| true| + |NaN|2.0| true|false| + +---+---+-----+-----+ + """ + return _invoke_function_over_columns("isnan", col) + + +def isnull(col: "ColumnOrName") -> Column: + """An expression that returns true if the column is null. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to compute on. + + Returns + ------- + :class:`~pyspark.sql.Column` + True if value is null and False otherwise. + + Examples + -------- + >>> df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b")) + >>> df.select("a", "b", isnull("a").alias("r1"), isnull(df.b).alias("r2")).show() + +----+----+-----+-----+ + | a| b| r1| r2| + +----+----+-----+-----+ + | 1|null|false| true| + |null| 2| true|false| + +----+----+-----+-----+ + """ + return _invoke_function_over_columns("isnull", col) + + +def monotonically_increasing_id() -> Column: + """A column that generates monotonically increasing 64-bit integers. + + The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. + The current implementation puts the partition ID in the upper 31 bits, and the record number + within each partition in the lower 33 bits. The assumption is that the data frame has + less than 1 billion partitions, and each partition has less than 8 billion records. + + .. versionadded:: 3.4.0 + + Notes + ----- + The function is non-deterministic because its result depends on partition IDs. + + As an example, consider a :class:`DataFrame` with two partitions, each with 3 records. + This expression would return the following IDs: + 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. + + Returns + ------- + :class:`~pyspark.sql.Column` + last value of the group. + + Examples + -------- + >>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1']) + >>> df0.select(monotonically_increasing_id().alias('id')).collect() + [Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)] + """ + return _invoke_function("monotonically_increasing_id") + + +def nanvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: + """Returns col1 if it is not NaN, or col2 if col1 is NaN. + + Both inputs should be floating point columns (:class:`DoubleType` or :class:`FloatType`). + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col1 : :class:`~pyspark.sql.Column` or str + first column to check. + col2 : :class:`~pyspark.sql.Column` or str + second column to return if first is NaN. + + Returns + ------- + :class:`~pyspark.sql.Column` + value from first column or second if first is NaN . + + Examples + -------- + >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) + >>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect() + [Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)] + """ + return _invoke_function_over_columns("nanvl", col1, col2) + + +def rand(seed: Optional[int] = None) -> Column: + """Generates a random column with independent and identically distributed (i.i.d.) samples + uniformly distributed in [0.0, 1.0). + + .. versionadded:: 3.4.0 + + Notes + ----- + The function is non-deterministic in general case. + + Parameters + ---------- + seed : int (default: None) + seed value for random generator. + + Returns + ------- + :class:`~pyspark.sql.Column` + random values. + + Examples + -------- + >>> df = spark.range(2) + >>> df.withColumn('rand', rand(seed=42) * 3).show() # doctest: +SKIP + +---+------------------+ + | id| rand| + +---+------------------+ + | 0|1.4385751892400076| + | 1|1.7082186019706387| + +---+------------------+ + """ + if seed is not None: + return _invoke_function("rand", lit(seed)) + else: + return _invoke_function("rand") + + +def randn(seed: Optional[int] = None) -> Column: + """Generates a column with independent and identically distributed (i.i.d.) samples from + the standard normal distribution. + + .. versionadded:: 3.4.0 + + Notes + ----- + The function is non-deterministic in general case. + + Parameters + ---------- + seed : int (default: None) + seed value for random generator. + + Returns + ------- + :class:`~pyspark.sql.Column` + random values. + + Examples + -------- + >>> df = spark.range(2) + >>> df.withColumn('randn', randn(seed=42)).show() # doctest: +SKIP + +---+--------------------+ + | id| randn| + +---+--------------------+ + | 0|-0.04167221574820542| + | 1| 0.15241403986452778| + +---+--------------------+ + """ + if seed is not None: + return _invoke_function("randn", lit(seed)) + else: + return _invoke_function("randn") + + +def spark_partition_id() -> Column: + """A column for partition ID. + + .. versionadded:: 3.4.0 + + Notes + ----- + This is non deterministic because it depends on data partitioning and task scheduling. + + Returns + ------- + :class:`~pyspark.sql.Column` + partition id the record belongs to. + + Examples + -------- + >>> df = spark.range(2) + >>> df.repartition(1).select(spark_partition_id().alias("pid")).collect() + [Row(pid=0), Row(pid=0)] + """ + return _invoke_function("spark_partition_id") + + +# TODO(SPARK-41319): Support case-when in Column +# def when(condition: Column, value: Any) -> Column: +# """Evaluates a list of conditions and returns one of multiple possible result expressions. +# If :func:`pyspark.sql.Column.otherwise` is not invoked, None is returned for unmatched +# conditions. +# +# .. versionadded:: 3.4.0 +# +# Parameters +# ---------- +# condition : :class:`~pyspark.sql.Column` +# a boolean :class:`~pyspark.sql.Column` expression. +# value : +# a literal value, or a :class:`~pyspark.sql.Column` expression. +# +# Returns +# ------- +# :class:`~pyspark.sql.Column` +# column representing when expression. +# +# Examples +# -------- +# >>> df = spark.range(3) +# >>> df.select(when(df['id'] == 2, 3).otherwise(4).alias("age")).show() +# +---+ +# |age| +# +---+ +# | 4| +# | 4| +# | 3| +# +---+ +# +# >>> df.select(when(df.id == 2, df.id + 1).alias("age")).show() +# +----+ +# | age| +# +----+ +# |null| +# |null| +# | 3| +# +----+ +# """ +# # Explicitly not using ColumnOrName type here to make reading condition less opaque +# if not isinstance(condition, Column): +# raise TypeError("condition should be a Column") +# v = value._jc if isinstance(value, Column) else value +# +# return _invoke_function("when", condition._jc, v) + + # Sort Functions diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index a25f2b2ccfa..f2c9b188985 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -97,6 +97,89 @@ class SparkConnectFunctionTests(SparkConnectSQLTestCase): """These test cases exercise the interface to the proto plan generation but do not call Spark.""" + def test_normal_functions(self): + from pyspark.sql import functions as SF + from pyspark.sql.connect import functions as CF + + query = """ + SELECT * FROM VALUES + (0, float("NAN"), NULL), (1, NULL, 2.0), (2, 2.1, 3.5) + AS tab(a, b, c) + """ + # +---+----+----+ + # | a| b| c| + # +---+----+----+ + # | 0| NaN|null| + # | 1|null| 2.0| + # | 2| 2.1| 3.5| + # +---+----+----+ + + cdf = self.connect.sql(query) + sdf = self.spark.sql(query) + + self.assert_eq( + cdf.select(CF.bitwise_not(cdf.a)).toPandas(), + sdf.select(SF.bitwise_not(sdf.a)).toPandas(), + ) + self.assert_eq( + cdf.select(CF.coalesce(cdf.a, "b", cdf.c)).toPandas(), + sdf.select(SF.coalesce(sdf.a, "b", sdf.c)).toPandas(), + ) + self.assert_eq( + cdf.select(CF.expr("a + b - c")).toPandas(), + sdf.select(SF.expr("a + b - c")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.greatest(cdf.a, "b", cdf.c)).toPandas(), + sdf.select(SF.greatest(sdf.a, "b", sdf.c)).toPandas(), + ) + self.assert_eq( + cdf.select(CF.isnan(cdf.a), CF.isnan("b")).toPandas(), + sdf.select(SF.isnan(sdf.a), SF.isnan("b")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.isnull(cdf.a), CF.isnull("b")).toPandas(), + sdf.select(SF.isnull(sdf.a), SF.isnull("b")).toPandas(), + ) + self.assert_eq( + cdf.select(CF.input_file_name()).toPandas(), + sdf.select(SF.input_file_name()).toPandas(), + ) + self.assert_eq( + cdf.select(CF.least(cdf.a, "b", cdf.c)).toPandas(), + sdf.select(SF.least(sdf.a, "b", sdf.c)).toPandas(), + ) + self.assert_eq( + cdf.select(CF.monotonically_increasing_id()).toPandas(), + sdf.select(SF.monotonically_increasing_id()).toPandas(), + ) + self.assert_eq( + cdf.select(CF.nanvl("b", cdf.c)).toPandas(), + sdf.select(SF.nanvl("b", sdf.c)).toPandas(), + ) + # Can not compare the values due to the random seed + self.assertEqual( + cdf.select(CF.rand()).count(), + sdf.select(SF.rand()).count(), + ) + self.assert_eq( + cdf.select(CF.rand(100)).toPandas(), + sdf.select(SF.rand(100)).toPandas(), + ) + # Can not compare the values due to the random seed + self.assertEqual( + cdf.select(CF.randn()).count(), + sdf.select(SF.randn()).count(), + ) + self.assert_eq( + cdf.select(CF.randn(100)).toPandas(), + sdf.select(SF.randn(100)).toPandas(), + ) + self.assert_eq( + cdf.select(CF.spark_partition_id()).toPandas(), + sdf.select(SF.spark_partition_id()).toPandas(), + ) + def test_sorting_functions_with_column(self): from pyspark.sql.connect import functions as CF from pyspark.sql.connect.column import Column --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org