[spark] branch master updated: [SPARK-41700][CONNECT][PYTHON] Remove `FunctionBuilder`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d5865d0c085 [SPARK-41700][CONNECT][PYTHON] Remove `FunctionBuilder` d5865d0c085 is described below commit d5865d0c085cc41b39e0d615970bce7da270def6 Author: Ruifeng Zheng AuthorDate: Sun Dec 25 13:33:02 2022 +0800 [SPARK-41700][CONNECT][PYTHON] Remove `FunctionBuilder` ### What changes were proposed in this pull request? Remove `FunctionBuilder` ### Why are the changes needed? since we had supported almost all the functions ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated UT Closes #39204 from zhengruifeng/connect_remove_FunctionBuilder. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/_typing.py | 5 - python/pyspark/sql/connect/column.py| 21 ++--- python/pyspark/sql/connect/expressions.py | 4 python/pyspark/sql/connect/function_builder.py | 15 --- .../pyspark/sql/tests/connect/test_connect_basic.py | 4 ++-- 5 files changed, 16 insertions(+), 33 deletions(-) diff --git a/python/pyspark/sql/connect/_typing.py b/python/pyspark/sql/connect/_typing.py index 4962df1c7d5..29a14384c82 100644 --- a/python/pyspark/sql/connect/_typing.py +++ b/python/pyspark/sql/connect/_typing.py @@ -42,11 +42,6 @@ DecimalLiteral = decimal.Decimal DateTimeLiteral = Union[datetime.datetime, datetime.date] -class FunctionBuilderCallable(Protocol): -def __call__(self, *_: ColumnOrName) -> Column: -... - - class UserDefinedFunctionCallable(Protocol): def __call__(self, *_: ColumnOrName) -> Column: ... diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index 440e559f365..f1107b507bc 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -202,9 +202,6 @@ class Column: ... def substr(self, startPos: Union[int, "Column"], length: Union[int, "Column"]) -> "Column": -from pyspark.sql.connect.function_builder import functions as F -from pyspark.sql.connect.functions import lit - if type(startPos) != type(length): raise TypeError( "startPos and length must be the same type. " @@ -214,19 +211,21 @@ class Column: ) ) -if isinstance(length, int): -length_exp = lit(length) -elif isinstance(length, Column): -length_exp = length +if isinstance(length, Column): +length_expr = length._expr +elif isinstance(length, int): +length_expr = LiteralExpression._from_value(length) else: raise TypeError("Unsupported type for substr().") -if isinstance(startPos, int): -start_exp = lit(startPos) +if isinstance(startPos, Column): +start_expr = startPos._expr +elif isinstance(startPos, int): +start_expr = LiteralExpression._from_value(startPos) else: -start_exp = startPos +raise TypeError("Unsupported type for substr().") -return F.substr(self, start_exp, length_exp) +return Column(UnresolvedFunction("substring", [self._expr, start_expr, length_expr])) substr.__doc__ = PySparkColumn.substr.__doc__ diff --git a/python/pyspark/sql/connect/expressions.py b/python/pyspark/sql/connect/expressions.py index 0c2717d7fe3..02d6047bd66 100644 --- a/python/pyspark/sql/connect/expressions.py +++ b/python/pyspark/sql/connect/expressions.py @@ -261,6 +261,10 @@ class LiteralExpression(Expression): else: raise ValueError(f"Unsupported Data Type {type(value).__name__}") +@classmethod +def _from_value(cls, value: Any) -> "LiteralExpression": +return LiteralExpression(value=value, dataType=LiteralExpression._infer_type(value)) + def to_plan(self, session: "SparkConnectClient") -> "proto.Expression": """Converts the literal expression to the literal in proto.""" diff --git a/python/pyspark/sql/connect/function_builder.py b/python/pyspark/sql/connect/function_builder.py index 7be43353b9e..7eb0ffc26ae 100644 --- a/python/pyspark/sql/connect/function_builder.py +++ b/python/pyspark/sql/connect/function_builder.py @@ -29,7 +29,6 @@ from pyspark.sql.connect.functions import col if TYPE_CHECKING: from pyspark.sql.connect._typing import ( ColumnOrName, -FunctionBuilderCallable, UserDefinedFunctionCallable, ) from pyspark.sql.connect.client import SparkConnectClient @@ -51,20 +50,6 @@ def _build(name: str, *args: "ColumnOrName") -> Column: return
[spark] branch master updated: [SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 110c0575448 [SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions 110c0575448 is described below commit 110c0575448c63c1d40e670e8e27b7ee6fb74907 Author: Ruifeng Zheng AuthorDate: Sat Dec 24 13:11:12 2022 -0800 [SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions ### What changes were proposed in this pull request? Implement [Partition Transformation Functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#partition-transformation-functions) ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? existing UT Closes #39203 from zhengruifeng/connect_function_transform_partition. Authored-by: Ruifeng Zheng Signed-off-by: Dongjoon Hyun --- .../sql/connect/planner/SparkConnectPlanner.scala | 21 +++ python/pyspark/sql/connect/functions.py| 44 ++ 2 files changed, 65 insertions(+) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index dce3a8c8e55..cb8d30b180c 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -646,6 +646,27 @@ class SparkConnectPlanner(session: SparkSession) { } Some(NthValue(children(0), children(1), ignoreNulls)) + case "bucket" if fun.getArgumentsCount == 2 => +val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) +(children.head, children.last) match { + case (numBuckets: Literal, child) if numBuckets.dataType == IntegerType => +Some(Bucket(numBuckets, child)) + case (other, _) => +throw InvalidPlanInput(s"numBuckets should be a literal integer, but got $other") +} + + case "years" if fun.getArgumentsCount == 1 => +Some(Years(transformExpression(fun.getArguments(0 + + case "months" if fun.getArgumentsCount == 1 => +Some(Months(transformExpression(fun.getArguments(0 + + case "days" if fun.getArgumentsCount == 1 => +Some(Days(transformExpression(fun.getArguments(0 + + case "hours" if fun.getArgumentsCount == 1 => +Some(Hours(transformExpression(fun.getArguments(0 + case _ => None } } diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index c2b1c7d61c8..407e7536f03 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2070,6 +2070,50 @@ def timestamp_seconds(col: "ColumnOrName") -> Column: timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__ +# Partition Transformation Functions + + +def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column: +if isinstance(numBuckets, int): +_numBuckets = lit(numBuckets) +elif isinstance(numBuckets, Column): +_numBuckets = numBuckets +else: +raise TypeError("numBuckets should be a Column or an int, got {}".format(type(numBuckets))) + +return _invoke_function("bucket", _numBuckets, _to_col(col)) + + +bucket.__doc__ = pysparkfuncs.bucket.__doc__ + + +def years(col: "ColumnOrName") -> Column: +return _invoke_function_over_columns("years", col) + + +years.__doc__ = pysparkfuncs.years.__doc__ + + +def months(col: "ColumnOrName") -> Column: +return _invoke_function_over_columns("months", col) + + +months.__doc__ = pysparkfuncs.months.__doc__ + + +def days(col: "ColumnOrName") -> Column: +return _invoke_function_over_columns("days", col) + + +days.__doc__ = pysparkfuncs.days.__doc__ + + +def hours(col: "ColumnOrName") -> Column: +return _invoke_function_over_columns("hours", col) + + +hours.__doc__ = pysparkfuncs.hours.__doc__ + # Misc Functions - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41699][CONNECT][BUILD] Upgrade buf to v1.11.0
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f2c347e5432 [SPARK-41699][CONNECT][BUILD] Upgrade buf to v1.11.0 f2c347e5432 is described below commit f2c347e54321fc22e79602aa487cfb7f3ba43793 Author: Ruifeng Zheng AuthorDate: Sat Dec 24 13:07:37 2022 -0800 [SPARK-41699][CONNECT][BUILD] Upgrade buf to v1.11.0 ### What changes were proposed in this pull request? Upgrade buf to v1.11.0 ### Why are the changes needed? this upgrade doesn't change the generated codes now, so should be fine to upgrade ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing UT Closes #39201 from zhengruifeng/connect_build_buf_1.11. Authored-by: Ruifeng Zheng Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 2 +- connector/connect/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 86753642239..5bd2fef9b0c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -561,7 +561,7 @@ jobs: - name: Install dependencies for Python code generation check run: | # See more in "Installation" https://docs.buf.build/installation#tarball -curl -LO https://github.com/bufbuild/buf/releases/download/v1.9.0/buf-Linux-x86_64.tar.gz +curl -LO https://github.com/bufbuild/buf/releases/download/v1.11.0/buf-Linux-x86_64.tar.gz mkdir -p $HOME/buf tar -xvzf buf-Linux-x86_64.tar.gz -C $HOME/buf --strip-components 1 - name: Install R linter dependencies and SparkR diff --git a/connector/connect/README.md b/connector/connect/README.md index e4753eef0cc..d30f65ffb5d 100644 --- a/connector/connect/README.md +++ b/connector/connect/README.md @@ -89,7 +89,7 @@ To use the release version of Spark Connect: ## Development Topics ### Generate proto generated files for the Python client -1. Install `buf version 1.9.0`: https://docs.buf.build/installation +1. Install `buf version 1.11.0`: https://docs.buf.build/installation 2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0` 3. Run `./connector/connect/dev/generate_protos.sh` 4. Optional Check `./dev/check-codegen-python.py` - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [CORE][MINOR] Correct spelling for RPC in log
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 02fdefddcfa [CORE][MINOR] Correct spelling for RPC in log 02fdefddcfa is described below commit 02fdefddcfa29aedc3b41548b61c4f8f0fd6c995 Author: Ted Yu AuthorDate: Sat Dec 24 08:38:43 2022 -0600 [CORE][MINOR] Correct spelling for RPC in log ### What changes were proposed in this pull request? This PR corrects spelling mistake for `RPC` in log for `sendRpc`. Similar error in context.R is also fixed. ### Why are the changes needed? The spelling mistake confuses users. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suite Closes #39200 from tedyu/trans-rpc. Authored-by: Ted Yu Signed-off-by: Sean Owen --- R/pkg/R/context.R | 2 +- .../src/main/java/org/apache/spark/network/client/TransportClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index cca6c2c817d..eea83aa5ab5 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -170,7 +170,7 @@ parallelize <- function(sc, coll, numSlices = 1) { serializedSlices <- lapply(slices, serialize, connection = NULL) # The RPC backend cannot handle arguments larger than 2GB (INT_MAX) - # If serialized data is safely less than that threshold we send it over the PRC channel. + # If serialized data is safely less than that threshold we send it over the RPC channel. # Otherwise, we write it to a file and send the file name if (objectSize < sizeLimit) { jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index dd2fdb08ee5..4a0a1566998 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -274,7 +274,7 @@ public class TransportClient implements Closeable { copy.flip(); result.set(copy); } catch (Throwable t) { - logger.warn("Error in responding PRC callback", t); + logger.warn("Error in responding RPC callback", t); result.setException(t); } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org