[spark] branch master updated: [SPARK-41554] fix changing of Decimal scale when scale decreased by m…
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 838954e5080 [SPARK-41554] fix changing of Decimal scale when scale decreased by m… 838954e5080 is described below commit 838954e50807e583ceb8317877710d58acff0a4b Author: oleksii.diagiliev AuthorDate: Fri Dec 30 15:52:05 2022 +0800 [SPARK-41554] fix changing of Decimal scale when scale decreased by m… …ore than 18 ### What changes were proposed in this pull request? Fix `Decimal` scaling that is stored as compact long internally when scale decreased by more than 18. For example, ``` Decimal(1, 38, 19).changePrecision(38, 0) ``` produces an exception ``` java.lang.ArrayIndexOutOfBoundsException: 19 at org.apache.spark.sql.types.Decimal.changePrecision(Decimal.scala:377) at org.apache.spark.sql.types.Decimal.changePrecision(Decimal.scala:328) ``` Another way to reproduce it with SQL query ``` sql("select cast(cast(cast(cast(id as decimal(38,15)) as decimal(38,30)) as decimal(38,37)) as decimal(38,17)) from range(3)").show ``` The bug exists for Decimal that is stored using compact long only, it works fine with Decimal that uses `scala.math.BigDecimal` internally. ### Why are the changes needed? Not able to execute the SQL query mentioned above. Please note, for my use case the SQL query is generated programatically, so I cannot optimize it manually. ### Does this PR introduce _any_ user-facing change? Yes, it will allow scale Decimal properly that is not currently possible due to the exception. ### How was this patch tested? Tests were added. The fix affects the scale decrease only, but I decided to also include tests for scale increase as I didn't find them. Closes #39099 from fe2s/SPARK-41554-fix-decimal-scaling. Authored-by: oleksii.diagiliev Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/sql/types/Decimal.scala | 60 +- .../org/apache/spark/sql/types/DecimalSuite.scala | 52 ++- 2 files changed, 87 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 44c00df379f..2c0b6677541 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -374,30 +374,42 @@ final class Decimal extends Ordered[Decimal] with Serializable { if (scale < _scale) { // Easier case: we just need to divide our scale down val diff = _scale - scale -val pow10diff = POW_10(diff) -// % and / always round to 0 -val droppedDigits = lv % pow10diff -lv /= pow10diff -roundMode match { - case ROUND_FLOOR => -if (droppedDigits < 0) { - lv += -1L -} - case ROUND_CEILING => -if (droppedDigits > 0) { - lv += 1L -} - case ROUND_HALF_UP => -if (math.abs(droppedDigits) * 2 >= pow10diff) { - lv += (if (droppedDigits < 0) -1L else 1L) -} - case ROUND_HALF_EVEN => -val doubled = math.abs(droppedDigits) * 2 -if (doubled > pow10diff || doubled == pow10diff && lv % 2 != 0) { - lv += (if (droppedDigits < 0) -1L else 1L) -} - case _ => -throw QueryExecutionErrors.unsupportedRoundingMode(roundMode) +// If diff is greater than max number of digits we store in Long, then +// value becomes 0. Otherwise we calculate new value dividing by power of 10. +// In both cases we apply rounding after that. +if (diff > MAX_LONG_DIGITS) { + lv = roundMode match { +case ROUND_FLOOR => if (lv < 0) -1L else 0L +case ROUND_CEILING => if (lv > 0) 1L else 0L +case ROUND_HALF_UP | ROUND_HALF_EVEN => 0L +case _ => throw QueryExecutionErrors.unsupportedRoundingMode(roundMode) + } +} else { + val pow10diff = POW_10(diff) + // % and / always round to 0 + val droppedDigits = lv % pow10diff + lv /= pow10diff + roundMode match { +case ROUND_FLOOR => + if (droppedDigits < 0) { +lv += -1L + } +case ROUND_CEILING => + if (droppedDigits > 0) { +lv += 1L + } +case ROUND_HALF_UP => + if (math.abs(droppedDigits) * 2 >= pow10diff) { +lv += (if (droppedDigits < 0) -1L else 1L) + } +case
[spark] branch master updated: [SPARK-41578][SQL] Assign name to _LEGACY_ERROR_TEMP_2141
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7823f84942a [SPARK-41578][SQL] Assign name to _LEGACY_ERROR_TEMP_2141 7823f84942a is described below commit 7823f84942acd1a1a6abc5c1f9045317795d00fb Author: itholic AuthorDate: Fri Dec 30 12:18:50 2022 +0500 [SPARK-41578][SQL] Assign name to _LEGACY_ERROR_TEMP_2141 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2141, "ENCODER_NOT_FOUND". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39279 from itholic/LEGACY_2141. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 11 ++- .../spark/sql/catalyst/ScalaReflection.scala | 2 +- .../spark/sql/errors/QueryExecutionErrors.scala| 8 +-- .../encoders/EncoderErrorMessageSuite.scala| 80 ++ .../catalyst/encoders/ExpressionEncoderSuite.scala | 13 ++-- 5 files changed, 52 insertions(+), 62 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 21b7c467b64..67398a30180 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -459,6 +459,11 @@ "The index 0 is invalid. An index shall be either < 0 or > 0 (the first element has index 1)." ] }, + "ENCODER_NOT_FOUND" : { +"message" : [ + "Not found an encoder of the type to Spark SQL internal representation. Consider to change the input type to one of supported at https://spark.apache.org/docs/latest/sql-ref-datatypes.html.; +] + }, "FAILED_EXECUTE_UDF" : { "message" : [ "Failed to execute user defined function (: () => )" @@ -4116,12 +4121,6 @@ "" ] }, - "_LEGACY_ERROR_TEMP_2141" : { -"message" : [ - "No Encoder found for ", - "" -] - }, "_LEGACY_ERROR_TEMP_2142" : { "message" : [ "Attributes for type is not supported" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 0a8a823216f..e02e42cea1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -779,7 +779,7 @@ object ScalaReflection extends ScalaReflection { } ProductEncoder(ClassTag(getClassFromType(t)), params) case _ => -throw QueryExecutionErrors.cannotFindEncoderForTypeError(tpe.toString, path) +throw QueryExecutionErrors.cannotFindEncoderForTypeError(tpe.toString) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index cef4acafe07..3e234cfee2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1483,13 +1483,11 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "walkedTypePath" -> walkedTypePath.toString())) } - def cannotFindEncoderForTypeError( - tpe: String, walkedTypePath: WalkedTypePath): SparkUnsupportedOperationException = { + def cannotFindEncoderForTypeError(typeName: String): SparkUnsupportedOperationException = { new SparkUnsupportedOperationException( - errorClass = "_LEGACY_ERROR_TEMP_2141", + errorClass = "ENCODER_NOT_FOUND", messageParameters = Map( -"tpe" -> tpe, -"walkedTypePath" -> walkedTypePath.toString())) +"typeName" -> typeName)) } def attributesForTypeUnsupportedError(schema: Schema): SparkUnsupportedOperationException = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala index 8c766ef8299..501dfa58305 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.encoders import scala.reflect.ClassTag -import org.apache.spark.SparkFunSuite +import
[spark] branch master updated: [SPARK-41068][CONNECT][PYTHON] Implement `DataFrame.stat.corr`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a3c837ae2ea [SPARK-41068][CONNECT][PYTHON] Implement `DataFrame.stat.corr` a3c837ae2ea is described below commit a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b Author: Jiaan Geng AuthorDate: Fri Dec 30 13:09:55 2022 +0800 [SPARK-41068][CONNECT][PYTHON] Implement `DataFrame.stat.corr` ### What changes were proposed in this pull request? Implement `DataFrame.stat.corr` with a proto message Implement `DataFrame.stat.corr` for scala API Implement `DataFrame.stat.corr` for python API ### Why are the changes needed? for Connect API coverage ### Does this PR introduce _any_ user-facing change? 'No'. New API ### How was this patch tested? New test cases. Closes #39236 from beliefer/SPARK-41068. Authored-by: Jiaan Geng Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/relations.proto| 20 +++ .../org/apache/spark/sql/connect/dsl/package.scala | 16 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 14 ++ python/pyspark/sql/connect/dataframe.py| 27 +++ python/pyspark/sql/connect/plan.py | 18 ++ python/pyspark/sql/connect/proto/relations_pb2.py | 194 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 68 .../sql/tests/connect/test_connect_basic.py| 24 +++ 8 files changed, 291 insertions(+), 90 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 2d0837b4924..8a604f0702c 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -70,6 +70,7 @@ message Relation { StatCrosstab crosstab = 101; StatDescribe describe = 102; StatCov cov = 103; +StatCorr corr = 104; // Catalog API (experimental / unstable) Catalog catalog = 200; @@ -481,6 +482,25 @@ message StatCov { string col2 = 3; } +// Calculates the correlation of two columns of a DataFrame. Currently only supports the Pearson +// Correlation Coefficient. It will invoke 'Dataset.stat.corr' (same as +// 'StatFunctions.pearsonCorrelation') to compute the results. +message StatCorr { + // (Required) The input relation. + Relation input = 1; + + // (Required) The name of the first column. + string col1 = 2; + + // (Required) The name of the second column. + string col2 = 3; + + // (Optional) Default value is 'pearson'. + // + // Currently only supports the Pearson Correlation Coefficient. + optional string method = 4; +} + // Replaces null values. // It will invoke 'Dataset.na.fill' (same as 'DataFrameNaFunctions.fill') to compute the results. // Following 3 parameter combinations are supported: diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 9e3346d9364..3bd713a9710 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -387,6 +387,22 @@ package object dsl { .build() } + def corr(col1: String, col2: String, method: String): Relation = { +Relation + .newBuilder() + .setCorr( +proto.StatCorr + .newBuilder() + .setInput(logicalPlan) + .setCol1(col1) + .setCol2(col2) + .setMethod(method) + .build()) + .build() + } + + def corr(col1: String, col2: String): Relation = corr(col1, col2, "pearson") + def crosstab(col1: String, col2: String): Relation = { Relation .newBuilder() diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index d06787e6b14..bb582e92755 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -89,6 +89,7 @@ class SparkConnectPlanner(session: SparkSession) { case proto.Relation.RelTypeCase.SUMMARY => transformStatSummary(rel.getSummary) case proto.Relation.RelTypeCase.DESCRIBE => transformStatDescribe(rel.getDescribe) case proto.Relation.RelTypeCase.COV => transformStatCov(rel.getCov) + case
[spark] branch master updated: [SPARK-41760][BUILD][CONNECT] Enforce scalafmt for Connect Client module
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new af8442ee903 [SPARK-41760][BUILD][CONNECT] Enforce scalafmt for Connect Client module af8442ee903 is described below commit af8442ee9036bcf5b864d863d7a918b8fe9dcafd Author: dengziming AuthorDate: Fri Dec 30 12:08:03 2022 +0800 [SPARK-41760][BUILD][CONNECT] Enforce scalafmt for Connect Client module ### What changes were proposed in this pull request? 1. This changes enables enforcing `scalafmt` for the Connect client module since it's a new module. 2. This change applies `scalafmt` on the existing code-base. ### Why are the changes needed? Faster, focussed code reviews. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Faster, focussed code reviews. Closes #39274 from dengziming/SPARK-41760. Authored-by: dengziming Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala | 3 ++- dev/lint-scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index beaae6412be..e188ef0d409 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -23,7 +23,8 @@ class SparkConnectClient(private val userContext: proto.UserContext) { /** * Placeholder method. - * @return User ID. + * @return + * User ID. */ def userId: String = userContext.getUserId() } diff --git a/dev/lint-scala b/dev/lint-scala index 48ecf57ef47..2549f775e49 100755 --- a/dev/lint-scala +++ b/dev/lint-scala @@ -30,13 +30,14 @@ ERRORS=$(./build/mvn \ -Dscalafmt.validateOnly=true \ -Dscalafmt.changedOnly=false \ -pl connector/connect/server \ +-pl connector/connect/client/jvm \ 2>&1 | grep -e "^Requires formatting" \ ) if test ! -z "$ERRORS"; then echo -e "The scalafmt check failed on connector/connect at following occurrences:\n\n$ERRORS\n" echo "Before submitting your change, please make sure to format your code using the following command:" - echo "./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl connector/connect/server" + echo "./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl connector/connect/server -pl connector/connect/client/jvm" exit 1 else echo -e "Scalafmt checks passed." - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (63a25dbea55 -> 8ff06e55824)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 63a25dbea55 [SPARK-41743][CONNECT] Re-enable doc tests for group.py add 8ff06e55824 [SPARK-41774][PYTHON][TESTS] Remove duplicated `test_vectorized_udf_unsupported_types` test No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py | 9 - 1 file changed, 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (37500780f8d -> 63a25dbea55)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 37500780f8d [SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace add 63a25dbea55 [SPARK-41743][CONNECT] Re-enable doc tests for group.py No new revisions were added by this update. Summary of changes: python/pyspark/sql/group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 37500780f8d [SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace 37500780f8d is described below commit 37500780f8d3475aa6eec49036363bba852f1498 Author: Hyukjin Kwon AuthorDate: Fri Dec 30 10:23:23 2022 +0800 [SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace ### What changes were proposed in this pull request? This PR proposes to support Spark Connect's Window in `pyspark.sql.window` namespace. https://github.com/apache/spark/pull/39041 implemented the base, and https://github.com/apache/spark/pull/39149 implemented Spark Connect's Window. This PR connects them. ### Why are the changes needed? To provide the users the same usage, see also https://github.com/apache/spark/pull/39041. ### Does this PR introduce _any_ user-facing change? Yes, see also https://github.com/apache/spark/pull/39041. Spark Connect can use Window functions via the same namespace `pyspark.sql.window`. ### How was this patch tested? Manually checked the related unittests. Closes #39290 from HyukjinKwon/SPARK-41292. Authored-by: Hyukjin Kwon Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/column.py | 2 -- python/pyspark/sql/connect/window.py | 24 python/pyspark/sql/utils.py | 25 ++--- python/pyspark/sql/window.py | 20 +--- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index 5025fc8e197..206d30b15d8 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -462,8 +462,6 @@ def _test() -> None: # TODO(SPARK-41771): __getitem__ does not work with Column.isin del pyspark.sql.connect.column.Column.getField.__doc__ del pyspark.sql.connect.column.Column.getItem.__doc__ -# TODO(SPARK-41758): Support Window functions -del pyspark.sql.connect.column.Column.over.__doc__ (failure_count, test_count) = doctest.testmod( pyspark.sql.connect.column, diff --git a/python/pyspark/sql/connect/window.py b/python/pyspark/sql/connect/window.py index c54157d0dbb..24b057022bf 100644 --- a/python/pyspark/sql/connect/window.py +++ b/python/pyspark/sql/connect/window.py @@ -113,8 +113,6 @@ class WindowSpec: frame=self._frame, ) -partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__ - def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": _cols: List[ColumnOrName] = [] for col in cols: @@ -149,8 +147,6 @@ class WindowSpec: frame=self._frame, ) -orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__ - def rowsBetween(self, start: int, end: int) -> "WindowSpec": if not isinstance(start, int): raise TypeError(f"start must be a int, but got {type(start).__name__}") @@ -168,8 +164,6 @@ class WindowSpec: frame=WindowFrame(isRowFrame=True, start=start, end=end), ) -rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__ - def rangeBetween(self, start: int, end: int) -> "WindowSpec": if not isinstance(start, int): raise TypeError(f"start must be a int, but got {type(start).__name__}") @@ -187,8 +181,6 @@ class WindowSpec: frame=WindowFrame(isRowFrame=False, start=start, end=end), ) -rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__ - def __repr__(self) -> str: strs: List[str] = [] if len(self._partitionSpec) > 0: @@ -202,6 +194,10 @@ class WindowSpec: return "WindowSpec(" + ", ".join(strs) + ")" +WindowSpec.rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__ +WindowSpec.rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__ +WindowSpec.orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__ +WindowSpec.partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__ WindowSpec.__doc__ = PySparkWindowSpec.__doc__ @@ -221,27 +217,23 @@ class Window: def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": return Window._spec.partitionBy(*cols) -partitionBy.__doc__ = PySparkWindow.partitionBy.__doc__ - @staticmethod def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": return Window._spec.orderBy(*cols) -orderBy.__doc__ = PySparkWindow.orderBy.__doc__ - @staticmethod def rowsBetween(start: int, end: int) -> "WindowSpec": return Window._spec.rowsBetween(start, end) -
[spark] branch master updated: [SPARK-41742] Support df.groupBy().agg({"*":"count"})
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2d028a2ec19 [SPARK-41742] Support df.groupBy().agg({"*":"count"}) 2d028a2ec19 is described below commit 2d028a2ec19f1a9e41e3b2e893c412bd28ab53a4 Author: Martin Grund AuthorDate: Fri Dec 30 10:22:00 2022 +0800 [SPARK-41742] Support df.groupBy().agg({"*":"count"}) ### What changes were proposed in this pull request? Compatibility changes to support `count(*)` for DF operations that are rewritten into `count(1)`. ### Why are the changes needed? Compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #39298 from grundprinzip/SPARK-41742. Authored-by: Martin Grund Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/group.py | 8 +++- python/pyspark/sql/group.py | 4 +--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/group.py b/python/pyspark/sql/connect/group.py index 4c074d6da1b..fd6f9816e2d 100644 --- a/python/pyspark/sql/connect/group.py +++ b/python/pyspark/sql/connect/group.py @@ -80,8 +80,14 @@ class GroupedData: assert exprs, "exprs should not be empty" if len(exprs) == 1 and isinstance(exprs[0], dict): +# There is a special case for count(*) which is rewritten into count(1). # Convert the dict into key value pairs -aggregate_cols = [scalar_function(exprs[0][k], col(k)) for k in exprs[0]] +aggregate_cols = [ +scalar_function( +exprs[0][k], lit(1) if exprs[0][k] == "count" and k == "*" else col(k) +) +for k in exprs[0] +] else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index ac661e39741..10468988186 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -78,8 +78,6 @@ class GroupedData(PandasGroupedOpsMixin): def agg(self, __exprs: Dict[str, str]) -> DataFrame: ... -# TODO(SPARK-41279): Enable the doctest with supporting the star in Spark Connect. -# TODO(SPARK-41743): groupBy(...).agg(...).sort does not actually sort the output def agg(self, *exprs: Union[Column, Dict[str, str]]) -> DataFrame: """Compute aggregates and returns the result as a :class:`DataFrame`. @@ -135,7 +133,7 @@ class GroupedData(PandasGroupedOpsMixin): Group-by name, and count each group. ->>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show() # doctest: +SKIP +>>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show() +-++ | name|count(1)| +-++ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cfdbfb7349a -> ccbd9a7b98d)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from cfdbfb7349a [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand` add ccbd9a7b98d [SPARK-41778][SQL] Add an alias "reduce" to ArrayAggregate No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../expressions/higherOrderFunctions.scala | 66 +--- .../sql-functions/sql-expression-schema.md | 1 + .../sql-tests/inputs/higher-order-functions.sql| 6 + .../results/ansi/higher-order-functions.sql.out| 38 + .../results/higher-order-functions.sql.out | 38 + .../apache/spark/sql/DataFrameFunctionsSuite.scala | 184 +++-- 7 files changed, 227 insertions(+), 107 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cfdbfb7349a [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand` cfdbfb7349a is described below commit cfdbfb7349a6c7765b0172c23f133d39196354b0 Author: ulysses-you AuthorDate: Thu Dec 29 17:02:00 2022 -0800 [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand` ### What changes were proposed in this pull request? This pr removes `OptimizedCreateHiveTableAsSelectCommand` and move the code that tune `InsertIntoHiveTable` to `InsertIntoHadoopFsRelationCommand` into `RelationConversions`. ### Why are the changes needed? CTAS use a nested execution to do data writing, so it is unnecessary to have `OptimizedCreateHiveTableAsSelectCommand`. The inside `InsertIntoHiveTable` would be converted to `InsertIntoHadoopFsRelationCommand` if possible. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? fix test Closes #39263 from ulysses-you/SPARK-41726. Authored-by: ulysses-you Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/hive/HiveStrategies.scala | 32 +- .../execution/CreateHiveTableAsSelectCommand.scala | 114 - .../sql/hive/execution/HiveExplainSuite.scala | 24 - .../spark/sql/hive/execution/SQLQuerySuite.scala | 98 ++ 4 files changed, 104 insertions(+), 164 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 42bf1e31bb0..af727f966e5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, InsertIntoDataSourceDirCommand} -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec import org.apache.spark.sql.internal.HiveSerDe @@ -232,15 +233,36 @@ case class RelationConversions( if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => metastoreCatalog.convert(relation, isWrite = false) - // CTAS - case CreateTable(tableDesc, mode, Some(query)) + // CTAS path + // This `InsertIntoHiveTable` is derived from `CreateHiveTableAsSelectCommand`, + // that only matches table insertion inside Hive CTAS. + // This pattern would not cause conflicts because this rule is always applied before + // `HiveAnalysis` and both of these rules are running once. + case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _) if query.resolved && DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) && conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => // validation is required to be done here before relation conversion. DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema)) -OptimizedCreateHiveTableAsSelectCommand( - tableDesc, query, query.output.map(_.name), mode) +val hiveTable = DDLUtils.readHiveTable(tableDesc) +val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError( +tableDesc.identifier) +} +InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + ifPartitionNotExists, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + if (overwrite) SaveMode.Overwrite else SaveMode.Append, + Some(tableDesc), +
[spark] branch master updated: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bb18703fdbf [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper bb18703fdbf is described below commit bb18703fdbfbe4f7887abebd75beb37af662d0f3 Author: Sandeep Singh AuthorDate: Thu Dec 29 16:14:07 2022 -0800 [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper ### What changes were proposed in this pull request? Add Protobuf serializer for RDDOperationGraphWrapper ### Why are the changes needed? Support fast and compact serialization/deserialization for RDDOperationGraphWrapper over RocksDB. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT Closes #39110 from techaddict/SPARK-41429-RDDOperationGraphWrapper. Authored-by: Sandeep Singh Signed-off-by: Gengliang Wang --- .../apache/spark/status/protobuf/store_types.proto | 35 ++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../RDDOperationGraphWrapperSerializer.scala | 120 + .../protobuf/KVStoreProtobufSerializerSuite.scala | 84 +++ 4 files changed, 240 insertions(+) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 22e22eea1a2..e9150490746 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -421,3 +421,38 @@ message SparkPlanGraphWrapper { repeated SparkPlanGraphNodeWrapper nodes = 2; repeated SparkPlanGraphEdge edges = 3; } + +message RDDOperationEdge { + int32 from_id = 1; + int32 to_id = 2; +} + +message RDDOperationNode { + enum DeterministicLevel { +UNSPECIFIED = 0; +DETERMINATE = 1; +UNORDERED = 2; +INDETERMINATE = 3; + } + int32 id = 1; + string name = 2; + bool cached = 3; + bool barrier = 4; + string callsite = 5; + DeterministicLevel output_deterministic_level = 6; +} + +message RDDOperationClusterWrapper { + string id = 1; + string name = 2; + repeated RDDOperationNode child_nodes = 3; + repeated RDDOperationClusterWrapper child_clusters = 4; +} + +message RDDOperationGraphWrapper { + int64 stage_id = 1; + repeated RDDOperationEdge edges = 2; + repeated RDDOperationEdge outgoing_edges = 3; + repeated RDDOperationEdge incoming_edges = 4; + RDDOperationClusterWrapper root_cluster = 5; +} diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 39127e6a28c..4e39d9ecdc0 100644 --- a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -27,3 +27,4 @@ org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer +org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala new file mode 100644 index 000..8975062082c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.DeterministicLevel +import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper} +import
[spark] branch master updated: [SQL][MINOR] Use Diamond operator for constructing HashMap
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 18488158bee [SQL][MINOR] Use Diamond operator for constructing HashMap 18488158bee is described below commit 18488158beee5435f99899f99b2e90fb6e37f3d5 Author: Ted Yu AuthorDate: Thu Dec 29 08:07:50 2022 -0600 [SQL][MINOR] Use Diamond operator for constructing HashMap ### What changes were proposed in this pull request? This PR uses Diamond operator for constructing HashMap and Tuple2 for type inference. ### Why are the changes needed? The change follows Java practices for creating new HashMap. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test suite. Closes #39250 from tedyu/hash-map. Authored-by: Ted Yu Signed-off-by: Sean Owen --- .../test/org/apache/spark/sql/JavaBeanDeserializationSuite.java | 6 +++--- .../java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index da626b4d873..66c985bdda0 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -590,9 +590,9 @@ public class JavaBeanDeserializationSuite implements Serializable { .reduceGroups(rf); List> expectedRecords = Arrays.asList( -new Tuple2("a", new Item("a", 8)), -new Tuple2("b", new Item("b", 3)), -new Tuple2("c", new Item("c", 2))); +new Tuple2<>("a", new Item("a", 8)), +new Tuple2<>("b", new Item("b", 3)), +new Tuple2<>("c", new Item("c", 2))); List> result = finalDs.collectAsList(); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java index 1836cc403c3..bee77616b7e 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java @@ -86,7 +86,7 @@ public class JavaColumnExpressionSuite { AnalysisException e = Assert.assertThrows(AnalysisException.class, () -> df.filter(df.col("a").isInCollection(Arrays.asList(new Column("b"); Assert.assertTrue(e.getErrorClass().equals("DATATYPE_MISMATCH.DATA_DIFF_TYPES")); -Map messageParameters = new HashMap(); +Map messageParameters = new HashMap<>(); messageParameters.put("functionName", "`in`"); messageParameters.put("dataType", "[\"INT\", \"ARRAY\"]"); messageParameters.put("sqlExpr", "\"(a IN (b))\""); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (908218ee149 -> a4a800727fc)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 908218ee149 [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the doctests for dropFields and withField add a4a800727fc [SPARK-41655][CONNECT][DOCS][FOLLOW-UP] Update related JIRAs for skipped tests No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/column.py | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the doctests for dropFields and withField
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 908218ee149 [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the doctests for dropFields and withField 908218ee149 is described below commit 908218ee149b9faf1738a3389781e7a2c825b8a8 Author: Hyukjin Kwon AuthorDate: Thu Dec 29 21:41:07 2022 +0900 [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the doctests for dropFields and withField ### What changes were proposed in this pull request? There is a logical conflict between https://github.com/apache/spark/pull/39249 and https://github.com/apache/spark/pull/39283. This PR fixes it with filing a related JIRA. ### Why are the changes needed? To recover the build. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually tested via: ```bash ./python/run-tests --testnames 'pyspark.sql.connect.column' ``` Closes #39288 from HyukjinKwon/SPARK-41767. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/column.py | 4 1 file changed, 4 insertions(+) diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index 2667e795974..39de3cdd126 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -443,6 +443,10 @@ def _test() -> None: os.environ["SPARK_REMOTE"] = "sc://localhost" globs["spark"] = PySparkSession.builder.remote("sc://localhost").getOrCreate() +# TODO(SPARK-41746): SparkSession.createDataFrame does not support nested datatypes +del pyspark.sql.connect.column.Column.dropFields.__doc__ +# TODO(SPARK-41772): Enable pyspark.sql.connect.column.Column.withField doctest +del pyspark.sql.connect.column.Column.withField.__doc__ # TODO(SPARK-41751): Support Column.bitwiseAND,bitwiseOR,bitwiseXOR,eqNullSafe,isNotNull, # isNull,isin del pyspark.sql.connect.column.Column.bitwiseAND.__doc__ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41767][CONNECT][PYTHON] Implement `Column.{withField, dropFields}`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 86f6dde3079 [SPARK-41767][CONNECT][PYTHON] Implement `Column.{withField, dropFields}` 86f6dde3079 is described below commit 86f6dde30798e69c7a953ee59788a4a9831b37cd Author: Ruifeng Zheng AuthorDate: Thu Dec 29 20:57:01 2022 +0900 [SPARK-41767][CONNECT][PYTHON] Implement `Column.{withField, dropFields}` ### What changes were proposed in this pull request? Implement `Column.{withField, dropFields}` ### Why are the changes needed? For API coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? added UT Closes #39283 from zhengruifeng/connect_column_field. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/expressions.proto | 15 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 17 +++ python/pyspark/sql/column.py | 6 + python/pyspark/sql/connect/column.py | 37 +- python/pyspark/sql/connect/expressions.py | 53 .../pyspark/sql/connect/proto/expressions_pb2.py | 93 +++-- .../pyspark/sql/connect/proto/expressions_pb2.pyi | 53 .../sql/tests/connect/test_connect_column.py | 147 +++-- 8 files changed, 366 insertions(+), 55 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto index b8ed9eb6f23..fa2836702c6 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto @@ -41,6 +41,7 @@ message Expression { LambdaFunction lambda_function = 10; Window window = 11; UnresolvedExtractValue unresolved_extract_value = 12; +UpdateFields update_fields = 13; } @@ -241,6 +242,20 @@ message Expression { Expression extraction = 2; } + // Add, replace or drop a field of `StructType` expression by name. + message UpdateFields { +// (Required) The struct expression. +Expression struct_expression = 1; + +// (Required) The field name. +string field_name = 2; + +// (Optional) The expression to add or replace. +// +// When not set, it means this field will be dropped. +Expression value_expression = 3; + } + message Alias { // (Required) The expression that alias will be added on. Expression expr = 1; diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 4bb90fc5bc0..d06787e6b14 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -596,6 +596,8 @@ class SparkConnectPlanner(session: SparkSession) { transformUnresolvedRegex(exp.getUnresolvedRegex) case proto.Expression.ExprTypeCase.UNRESOLVED_EXTRACT_VALUE => transformUnresolvedExtractValue(exp.getUnresolvedExtractValue) + case proto.Expression.ExprTypeCase.UPDATE_FIELDS => +transformUpdateFields(exp.getUpdateFields) case proto.Expression.ExprTypeCase.SORT_ORDER => transformSortOrder(exp.getSortOrder) case proto.Expression.ExprTypeCase.LAMBDA_FUNCTION => transformLambdaFunction(exp.getLambdaFunction) @@ -860,6 +862,21 @@ class SparkConnectPlanner(session: SparkSession) { transformExpression(extract.getExtraction)) } + private def transformUpdateFields(update: proto.Expression.UpdateFields): UpdateFields = { +if (update.hasValueExpression) { + // add or replace a field + UpdateFields.apply( +col = transformExpression(update.getStructExpression), +fieldName = update.getFieldName, +expr = transformExpression(update.getValueExpression)) +} else { + // drop a field + UpdateFields.apply( +col = transformExpression(update.getStructExpression), +fieldName = update.getFieldName) +} + } + private def transformWindowExpression(window: proto.Expression.Window) = { if (!window.hasWindowFunction) { throw InvalidPlanInput(s"WindowFunction is required in WindowExpression") diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 5a0987b4cfe..cd7b6932c2f 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -522,6 +522,9 @@ class Column: .. versionadded:: 3.1.0 +.. versionchanged:: 3.4.0
[spark] branch master updated: [SPARK-41655][CONNECT] Enable doctests in pyspark.sql.connect.column
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 48c5843ad4b [SPARK-41655][CONNECT] Enable doctests in pyspark.sql.connect.column 48c5843ad4b is described below commit 48c5843ad4b06e85e07d1db5b308a460209f6126 Author: Sandeep Singh AuthorDate: Thu Dec 29 20:52:35 2022 +0900 [SPARK-41655][CONNECT] Enable doctests in pyspark.sql.connect.column ### What changes were proposed in this pull request? This PR proposes to enable doctests in pyspark.sql.connect.column that is virtually the same as pyspark.sql.column. ### Why are the changes needed? To make sure on the PySpark compatibility and test coverage. ### Does this PR introduce any user-facing change? No, doctest's only. ### How was this patch tested? New Doctests Added Closes #39249 from techaddict/SPARK-41655-pyspark.sql.connect.column. Lead-authored-by: Sandeep Singh Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- dev/sparktestsupport/modules.py | 1 + python/pyspark/sql/column.py | 15 - python/pyspark/sql/connect/column.py | 60 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 558d058f3e5..df3a1f180fc 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -507,6 +507,7 @@ pyspark_connect = Module( "pyspark.sql.connect.catalog", "pyspark.sql.connect.group", "pyspark.sql.connect.window", +"pyspark.sql.connect.column", # unittests "pyspark.sql.tests.connect.test_connect_column_expressions", "pyspark.sql.tests.connect.test_connect_plan_only", diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 3bc49ef8031..5a0987b4cfe 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -182,6 +182,9 @@ def _reverse_op( return _ +# TODO(SPARK-41757): Compatibility of string representation for Column + + class Column: """ @@ -200,17 +203,16 @@ class Column: ... [(2, "Alice"), (5, "Bob")], ["age", "name"]) Select a column out of a DataFrame - ->>> df.name +>>> df.name # doctest: +SKIP Column<'name'> ->>> df["name"] +>>> df["name"] # doctest: +SKIP Column<'name'> Create from an expression ->>> df.age + 1 +>>> df.age + 1 # doctest: +SKIP Column<'(age + 1)'> ->>> 1 / df.age +>>> 1 / df.age # doctest: +SKIP Column<'(1 / age)'> """ @@ -1258,8 +1260,7 @@ class Column: >>> from pyspark.sql import Window >>> window = Window.partitionBy("name").orderBy("age") \ .rowsBetween(Window.unboundedPreceding, Window.currentRow) ->>> from pyspark.sql.functions import rank, min ->>> from pyspark.sql.functions import desc +>>> from pyspark.sql.functions import rank, min, desc >>> df = spark.createDataFrame( ... [(2, "Alice"), (5, "Bob")], ["age", "name"]) >>> df.withColumn("rank", rank().over(window)) \ diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index b873a757e41..58d86a3d389 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -28,6 +28,7 @@ from typing import ( Optional, ) +from pyspark import SparkContext, SparkConf from pyspark.sql.types import DataType from pyspark.sql.column import Column as PySparkColumn @@ -390,3 +391,62 @@ class Column: Column.__doc__ = PySparkColumn.__doc__ + + +def _test() -> None: +import os +import sys +import doctest +from pyspark.sql import SparkSession as PySparkSession +from pyspark.testing.connectutils import should_test_connect, connect_requirement_message + +os.chdir(os.environ["SPARK_HOME"]) + +if should_test_connect: +import pyspark.sql.connect.column + +globs = pyspark.sql.connect.column.__dict__.copy() +# Works around to create a regular Spark session +sc = SparkContext("local[4]", "sql.connect.column tests", conf=SparkConf()) +globs["_spark"] = PySparkSession(sc, options={"spark.app.name": "sql.connect.column tests"}) + +# Creates a remote Spark session. +os.environ["SPARK_REMOTE"] = "sc://localhost" +globs["spark"] = PySparkSession.builder.remote("sc://localhost").getOrCreate() + +# TODO(SPARK-41751): Support Column.bitwiseAND,bitwiseOR,bitwiseXOR,eqNullSafe,isNotNull, +# isNull,isin +del pyspark.sql.connect.column.Column.bitwiseAND.__doc__ +del pyspark.sql.connect.column.Column.bitwiseOR.__doc__ +del
[spark] branch master updated: [SPARK-41344][SQL] Make error clearer when table not found in SupportsCatalogOptions catalog
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b331ad0ae2b [SPARK-41344][SQL] Make error clearer when table not found in SupportsCatalogOptions catalog b331ad0ae2b is described below commit b331ad0ae2b6ab566aea2ddbdbddcd3d28f8eaa1 Author: Zhen Wang <643348...@qq.com> AuthorDate: Thu Dec 29 20:25:17 2022 +0900 [SPARK-41344][SQL] Make error clearer when table not found in SupportsCatalogOptions catalog ### What changes were proposed in this pull request? Make error clearer when table not found in SupportsCatalogOptions catalog. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #38871 from wForget/SPARK-41344. Lead-authored-by: Zhen Wang <643348...@qq.com> Co-authored-by: wForget <643348...@qq.com> Signed-off-by: Hyukjin Kwon --- .../sql/connector/catalog/CatalogV2Util.scala | 27 ++ .../datasources/v2/DataSourceV2Utils.scala | 2 +- .../connector/SupportsCatalogOptionsSuite.scala| 9 +++- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index abd43065048..72c557c8d77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -330,22 +330,29 @@ private[sql] object CatalogV2Util { ident: Identifier, timeTravelSpec: Option[TimeTravelSpec] = None): Option[Table] = try { - if (timeTravelSpec.nonEmpty) { -timeTravelSpec.get match { - case v: AsOfVersion => -Option(catalog.asTableCatalog.loadTable(ident, v.version)) - case ts: AsOfTimestamp => -Option(catalog.asTableCatalog.loadTable(ident, ts.timestamp)) -} - } else { -Option(catalog.asTableCatalog.loadTable(ident)) - } + Option(getTable(catalog, ident, timeTravelSpec)) } catch { case _: NoSuchTableException => None case _: NoSuchDatabaseException => None case _: NoSuchNamespaceException => None } + def getTable( + catalog: CatalogPlugin, + ident: Identifier, + timeTravelSpec: Option[TimeTravelSpec] = None): Table = { +if (timeTravelSpec.nonEmpty) { + timeTravelSpec.get match { +case v: AsOfVersion => + catalog.asTableCatalog.loadTable(ident, v.version) +case ts: AsOfTimestamp => + catalog.asTableCatalog.loadTable(ident, ts.timestamp) + } +} else { + catalog.asTableCatalog.loadTable(ident) +} + } + def loadFunction(catalog: CatalogPlugin, ident: Identifier): Option[UnboundFunction] = { try { Option(catalog.asFunctionCatalog.loadFunction(ident)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index f1d1cc5a173..c906e42a9b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -134,7 +134,7 @@ private[sql] object DataSourceV2Utils extends Logging { None } val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion, conf) -(CatalogV2Util.loadTable(catalog, ident, timeTravel).get, Some(catalog), Some(ident)) +(CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog), Some(ident)) case _ => // TODO: Non-catalog paths for DSV2 are currently not well defined. val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index f8278d18b0a..fd4f719417e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode} -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
[spark] branch master updated (febe5418f49 -> 054a7845497)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from febe5418f49 [SPARK-41761][CONNECT][PYTHON] Fix arithmetic ops: `__neg__`, `__pow__`, `__rpow__` add 054a7845497 [SPARK-41764][CONNECT][PYTHON] Make the internal string op name consistent with FunctionRegistry No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/column.py | 4 +- .../sql/tests/connect/test_connect_column.py | 46 ++ 2 files changed, 48 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4c7f8106bf1 -> febe5418f49)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4c7f8106bf1 [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, isNotNull, eqNullSafe}` add febe5418f49 [SPARK-41761][CONNECT][PYTHON] Fix arithmetic ops: `__neg__`, `__pow__`, `__rpow__` No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/column.py | 6 +-- .../sql/tests/connect/test_connect_column.py | 50 ++ 2 files changed, 53 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, isNotNull, eqNullSafe}`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4c7f8106bf1 [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, isNotNull, eqNullSafe}` 4c7f8106bf1 is described below commit 4c7f8106bf145203d0b1aed5f6d5762e915c83ca Author: Ruifeng Zheng AuthorDate: Thu Dec 29 16:39:10 2022 +0800 [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, isNotNull, eqNullSafe}` ### What changes were proposed in this pull request? Fix `Column.{isNull, isNotNull, eqNullSafe}` ### Why are the changes needed? they were wrongly implemented ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? added UT Closes #39273 from zhengruifeng/connect_column_fix_null. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/column.py | 6 ++-- .../sql/tests/connect/test_connect_column.py | 40 ++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index 30916ecafbf..5248c1c0ab5 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -124,7 +124,7 @@ class Column: __ge__ = _bin_op(">=") __le__ = _bin_op("<=") -eqNullSafe = _bin_op("eqNullSafe", PySparkColumn.eqNullSafe.__doc__) +eqNullSafe = _bin_op("<=>", PySparkColumn.eqNullSafe.__doc__) __neg__ = _func_op("negate") @@ -148,8 +148,8 @@ class Column: bitwiseAND = _bin_op("&", PySparkColumn.bitwiseAND.__doc__) bitwiseXOR = _bin_op("^", PySparkColumn.bitwiseXOR.__doc__) -isNull = _unary_op("isNull", PySparkColumn.isNull.__doc__) -isNotNull = _unary_op("isNotNull", PySparkColumn.isNotNull.__doc__) +isNull = _unary_op("isnull", PySparkColumn.isNull.__doc__) +isNotNull = _unary_op("isnotnull", PySparkColumn.isNotNull.__doc__) def __ne__( # type: ignore[override] self, diff --git a/python/pyspark/sql/tests/connect/test_connect_column.py b/python/pyspark/sql/tests/connect/test_connect_column.py index 7eb30505504..e34f27aee98 100644 --- a/python/pyspark/sql/tests/connect/test_connect_column.py +++ b/python/pyspark/sql/tests/connect/test_connect_column.py @@ -112,6 +112,46 @@ class SparkConnectTests(SparkConnectSQLTestCase): df4.filter(df4.name.isNotNull()).toPandas(), ) +def test_column_with_null(self): +# SPARK-41751: test isNull, isNotNull, eqNullSafe +from pyspark.sql import functions as SF +from pyspark.sql.connect import functions as CF + +query = """ +SELECT * FROM VALUES +(1, 1, NULL), (2, NULL, NULL), (3, 3, 1) +AS tab(a, b, c) +""" + +# +---+++ +# | a| b| c| +# +---+++ +# | 1| 1|null| +# | 2|null|null| +# | 3| 3| 1| +# +---+++ + +cdf = self.connect.sql(query) +sdf = self.spark.sql(query) + +# test isNull +self.assert_eq( +cdf.select(cdf.a.isNull(), cdf["b"].isNull(), CF.col("c").isNull()).toPandas(), +sdf.select(sdf.a.isNull(), sdf["b"].isNull(), SF.col("c").isNull()).toPandas(), +) + +# test isNotNull +self.assert_eq( +cdf.select(cdf.a.isNotNull(), cdf["b"].isNotNull(), CF.col("c").isNotNull()).toPandas(), +sdf.select(sdf.a.isNotNull(), sdf["b"].isNotNull(), SF.col("c").isNotNull()).toPandas(), +) + +# test eqNullSafe +self.assert_eq( +cdf.select(cdf.a.eqNullSafe(cdf.b), cdf["b"].eqNullSafe(CF.col("c"))).toPandas(), +sdf.select(sdf.a.eqNullSafe(sdf.b), sdf["b"].eqNullSafe(SF.col("c"))).toPandas(), +) + def test_invalid_ops(self): query = """ SELECT * FROM VALUES - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org