[spark] branch master updated: [SPARK-41554] fix changing of Decimal scale when scale decreased by m…

2022-12-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 838954e5080 [SPARK-41554] fix changing of Decimal scale when scale 
decreased by m…
838954e5080 is described below

commit 838954e50807e583ceb8317877710d58acff0a4b
Author: oleksii.diagiliev 
AuthorDate: Fri Dec 30 15:52:05 2022 +0800

[SPARK-41554] fix changing of Decimal scale when scale decreased by m…

…ore than 18

### What changes were proposed in this pull request?
Fix `Decimal` scaling that is stored as compact long internally when scale 
decreased by more than 18. For example,
```
Decimal(1, 38, 19).changePrecision(38, 0)
```
produces an exception
```
java.lang.ArrayIndexOutOfBoundsException: 19
at org.apache.spark.sql.types.Decimal.changePrecision(Decimal.scala:377)
at org.apache.spark.sql.types.Decimal.changePrecision(Decimal.scala:328)
```
Another way to reproduce it with SQL query
```
sql("select cast(cast(cast(cast(id as decimal(38,15)) as decimal(38,30)) as 
decimal(38,37)) as decimal(38,17)) from range(3)").show
```

The bug exists for Decimal that is stored using compact long only, it works 
fine with Decimal that uses `scala.math.BigDecimal` internally.

### Why are the changes needed?
Not able to execute the SQL query mentioned above. Please note, for my use 
case the SQL query is generated programatically, so I cannot optimize it 
manually.

### Does this PR introduce _any_ user-facing change?

Yes, it will allow scale Decimal properly that is not currently possible 
due to the exception.

### How was this patch tested?
Tests were added. The fix affects the scale decrease only, but I decided to 
also include tests for scale increase as I didn't find them.

Closes #39099 from fe2s/SPARK-41554-fix-decimal-scaling.

Authored-by: oleksii.diagiliev 
Signed-off-by: Wenchen Fan 
---
 .../scala/org/apache/spark/sql/types/Decimal.scala | 60 +-
 .../org/apache/spark/sql/types/DecimalSuite.scala  | 52 ++-
 2 files changed, 87 insertions(+), 25 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 44c00df379f..2c0b6677541 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -374,30 +374,42 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
   if (scale < _scale) {
 // Easier case: we just need to divide our scale down
 val diff = _scale - scale
-val pow10diff = POW_10(diff)
-// % and / always round to 0
-val droppedDigits = lv % pow10diff
-lv /= pow10diff
-roundMode match {
-  case ROUND_FLOOR =>
-if (droppedDigits < 0) {
-  lv += -1L
-}
-  case ROUND_CEILING =>
-if (droppedDigits > 0) {
-  lv += 1L
-}
-  case ROUND_HALF_UP =>
-if (math.abs(droppedDigits) * 2 >= pow10diff) {
-  lv += (if (droppedDigits < 0) -1L else 1L)
-}
-  case ROUND_HALF_EVEN =>
-val doubled = math.abs(droppedDigits) * 2
-if (doubled > pow10diff || doubled == pow10diff && lv % 2 != 0) {
-  lv += (if (droppedDigits < 0) -1L else 1L)
-}
-  case _ =>
-throw QueryExecutionErrors.unsupportedRoundingMode(roundMode)
+// If diff is greater than max number of digits we store in Long, then
+// value becomes 0. Otherwise we calculate new value dividing by power 
of 10.
+// In both cases we apply rounding after that.
+if (diff > MAX_LONG_DIGITS) {
+  lv = roundMode match {
+case ROUND_FLOOR => if (lv < 0) -1L else 0L
+case ROUND_CEILING => if (lv > 0) 1L else 0L
+case ROUND_HALF_UP | ROUND_HALF_EVEN => 0L
+case _ => throw 
QueryExecutionErrors.unsupportedRoundingMode(roundMode)
+  }
+} else {
+  val pow10diff = POW_10(diff)
+  // % and / always round to 0
+  val droppedDigits = lv % pow10diff
+  lv /= pow10diff
+  roundMode match {
+case ROUND_FLOOR =>
+  if (droppedDigits < 0) {
+lv += -1L
+  }
+case ROUND_CEILING =>
+  if (droppedDigits > 0) {
+lv += 1L
+  }
+case ROUND_HALF_UP =>
+  if (math.abs(droppedDigits) * 2 >= pow10diff) {
+lv += (if (droppedDigits < 0) -1L else 1L)
+  }
+case 

[spark] branch master updated: [SPARK-41578][SQL] Assign name to _LEGACY_ERROR_TEMP_2141

2022-12-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7823f84942a [SPARK-41578][SQL] Assign name to _LEGACY_ERROR_TEMP_2141
7823f84942a is described below

commit 7823f84942acd1a1a6abc5c1f9045317795d00fb
Author: itholic 
AuthorDate: Fri Dec 30 12:18:50 2022 +0500

[SPARK-41578][SQL] Assign name to _LEGACY_ERROR_TEMP_2141

### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_2141, 
"ENCODER_NOT_FOUND".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39279 from itholic/LEGACY_2141.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 11 ++-
 .../spark/sql/catalyst/ScalaReflection.scala   |  2 +-
 .../spark/sql/errors/QueryExecutionErrors.scala|  8 +--
 .../encoders/EncoderErrorMessageSuite.scala| 80 ++
 .../catalyst/encoders/ExpressionEncoderSuite.scala | 13 ++--
 5 files changed, 52 insertions(+), 62 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 21b7c467b64..67398a30180 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -459,6 +459,11 @@
   "The index 0 is invalid. An index shall be either < 0 or > 0 (the first 
element has index 1)."
 ]
   },
+  "ENCODER_NOT_FOUND" : {
+"message" : [
+  "Not found an encoder of the type  to Spark SQL internal 
representation. Consider to change the input type to one of supported at 
https://spark.apache.org/docs/latest/sql-ref-datatypes.html.;
+]
+  },
   "FAILED_EXECUTE_UDF" : {
 "message" : [
   "Failed to execute user defined function (: () 
=> )"
@@ -4116,12 +4121,6 @@
   ""
 ]
   },
-  "_LEGACY_ERROR_TEMP_2141" : {
-"message" : [
-  "No Encoder found for ",
-  ""
-]
-  },
   "_LEGACY_ERROR_TEMP_2142" : {
 "message" : [
   "Attributes for type  is not supported"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 0a8a823216f..e02e42cea1a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -779,7 +779,7 @@ object ScalaReflection extends ScalaReflection {
 }
 ProductEncoder(ClassTag(getClassFromType(t)), params)
   case _ =>
-throw QueryExecutionErrors.cannotFindEncoderForTypeError(tpe.toString, 
path)
+throw QueryExecutionErrors.cannotFindEncoderForTypeError(tpe.toString)
 }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index cef4acafe07..3e234cfee2c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1483,13 +1483,11 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
 "walkedTypePath" -> walkedTypePath.toString()))
   }
 
-  def cannotFindEncoderForTypeError(
-  tpe: String, walkedTypePath: WalkedTypePath): 
SparkUnsupportedOperationException = {
+  def cannotFindEncoderForTypeError(typeName: String): 
SparkUnsupportedOperationException = {
 new SparkUnsupportedOperationException(
-  errorClass = "_LEGACY_ERROR_TEMP_2141",
+  errorClass = "ENCODER_NOT_FOUND",
   messageParameters = Map(
-"tpe" -> tpe,
-"walkedTypePath" -> walkedTypePath.toString()))
+"typeName" -> typeName))
   }
 
   def attributesForTypeUnsupportedError(schema: Schema): 
SparkUnsupportedOperationException = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
index 8c766ef8299..501dfa58305 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.encoders
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.SparkFunSuite
+import 

[spark] branch master updated: [SPARK-41068][CONNECT][PYTHON] Implement `DataFrame.stat.corr`

2022-12-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3c837ae2ea [SPARK-41068][CONNECT][PYTHON] Implement 
`DataFrame.stat.corr`
a3c837ae2ea is described below

commit a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b
Author: Jiaan Geng 
AuthorDate: Fri Dec 30 13:09:55 2022 +0800

[SPARK-41068][CONNECT][PYTHON] Implement `DataFrame.stat.corr`

### What changes were proposed in this pull request?
Implement `DataFrame.stat.corr` with a proto message

Implement `DataFrame.stat.corr` for scala API
Implement `DataFrame.stat.corr` for python API

### Why are the changes needed?
for Connect API coverage

### Does this PR introduce _any_ user-facing change?
'No'. New API

### How was this patch tested?
New test cases.

Closes #39236 from beliefer/SPARK-41068.

Authored-by: Jiaan Geng 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/relations.proto|  20 +++
 .../org/apache/spark/sql/connect/dsl/package.scala |  16 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  14 ++
 python/pyspark/sql/connect/dataframe.py|  27 +++
 python/pyspark/sql/connect/plan.py |  18 ++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 194 +++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  68 
 .../sql/tests/connect/test_connect_basic.py|  24 +++
 8 files changed, 291 insertions(+), 90 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 2d0837b4924..8a604f0702c 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -70,6 +70,7 @@ message Relation {
 StatCrosstab crosstab = 101;
 StatDescribe describe = 102;
 StatCov cov = 103;
+StatCorr corr = 104;
 
 // Catalog API (experimental / unstable)
 Catalog catalog = 200;
@@ -481,6 +482,25 @@ message StatCov {
   string col2 = 3;
 }
 
+// Calculates the correlation of two columns of a DataFrame. Currently only 
supports the Pearson
+// Correlation Coefficient. It will invoke 'Dataset.stat.corr' (same as
+// 'StatFunctions.pearsonCorrelation') to compute the results.
+message StatCorr {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) The name of the first column.
+  string col1 = 2;
+
+  // (Required) The name of the second column.
+  string col2 = 3;
+
+  // (Optional) Default value is 'pearson'.
+  //
+  // Currently only supports the Pearson Correlation Coefficient.
+  optional string method = 4;
+}
+
 // Replaces null values.
 // It will invoke 'Dataset.na.fill' (same as 'DataFrameNaFunctions.fill') to 
compute the results.
 // Following 3 parameter combinations are supported:
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 9e3346d9364..3bd713a9710 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -387,6 +387,22 @@ package object dsl {
   .build()
   }
 
+  def corr(col1: String, col2: String, method: String): Relation = {
+Relation
+  .newBuilder()
+  .setCorr(
+proto.StatCorr
+  .newBuilder()
+  .setInput(logicalPlan)
+  .setCol1(col1)
+  .setCol2(col2)
+  .setMethod(method)
+  .build())
+  .build()
+  }
+
+  def corr(col1: String, col2: String): Relation = corr(col1, col2, 
"pearson")
+
   def crosstab(col1: String, col2: String): Relation = {
 Relation
   .newBuilder()
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index d06787e6b14..bb582e92755 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -89,6 +89,7 @@ class SparkConnectPlanner(session: SparkSession) {
   case proto.Relation.RelTypeCase.SUMMARY => 
transformStatSummary(rel.getSummary)
   case proto.Relation.RelTypeCase.DESCRIBE => 
transformStatDescribe(rel.getDescribe)
   case proto.Relation.RelTypeCase.COV => transformStatCov(rel.getCov)
+  case 

[spark] branch master updated: [SPARK-41760][BUILD][CONNECT] Enforce scalafmt for Connect Client module

2022-12-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new af8442ee903 [SPARK-41760][BUILD][CONNECT] Enforce scalafmt for Connect 
Client module
af8442ee903 is described below

commit af8442ee9036bcf5b864d863d7a918b8fe9dcafd
Author: dengziming 
AuthorDate: Fri Dec 30 12:08:03 2022 +0800

[SPARK-41760][BUILD][CONNECT] Enforce scalafmt for Connect Client module

### What changes were proposed in this pull request?
1. This changes enables enforcing `scalafmt` for the Connect client module 
since it's a new module.
2. This change applies `scalafmt` on the existing code-base.

### Why are the changes needed?
Faster, focussed code reviews.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Faster, focussed code reviews.

Closes #39274 from dengziming/SPARK-41760.

Authored-by: dengziming 
Signed-off-by: Wenchen Fan 
---
 .../scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala | 3 ++-
 dev/lint-scala | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index beaae6412be..e188ef0d409 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -23,7 +23,8 @@ class SparkConnectClient(private val userContext: 
proto.UserContext) {
 
   /**
* Placeholder method.
-   * @return User ID.
+   * @return
+   *   User ID.
*/
   def userId: String = userContext.getUserId()
 }
diff --git a/dev/lint-scala b/dev/lint-scala
index 48ecf57ef47..2549f775e49 100755
--- a/dev/lint-scala
+++ b/dev/lint-scala
@@ -30,13 +30,14 @@ ERRORS=$(./build/mvn \
 -Dscalafmt.validateOnly=true \
 -Dscalafmt.changedOnly=false \
 -pl connector/connect/server \
+-pl connector/connect/client/jvm \
 2>&1 | grep -e "^Requires formatting" \
 )
 
 if test ! -z "$ERRORS"; then
   echo -e "The scalafmt check failed on connector/connect at following 
occurrences:\n\n$ERRORS\n"
   echo "Before submitting your change, please make sure to format your code 
using the following command:"
-  echo "./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false 
-Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl 
connector/connect/server"
+  echo "./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false 
-Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl 
connector/connect/server -pl connector/connect/client/jvm"
   exit 1
 else
   echo -e "Scalafmt checks passed."


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (63a25dbea55 -> 8ff06e55824)

2022-12-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 63a25dbea55 [SPARK-41743][CONNECT] Re-enable doc tests for group.py
 add 8ff06e55824 [SPARK-41774][PYTHON][TESTS] Remove duplicated 
`test_vectorized_udf_unsupported_types` test

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py | 9 -
 1 file changed, 9 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (37500780f8d -> 63a25dbea55)

2022-12-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 37500780f8d [SPARK-41292][CONNECT] Support Window in 
pyspark.sql.window namespace
 add 63a25dbea55 [SPARK-41743][CONNECT] Re-enable doc tests for group.py

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/group.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace

2022-12-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 37500780f8d [SPARK-41292][CONNECT] Support Window in 
pyspark.sql.window namespace
37500780f8d is described below

commit 37500780f8d3475aa6eec49036363bba852f1498
Author: Hyukjin Kwon 
AuthorDate: Fri Dec 30 10:23:23 2022 +0800

[SPARK-41292][CONNECT] Support Window in pyspark.sql.window namespace

### What changes were proposed in this pull request?

This PR proposes to support Spark Connect's Window in `pyspark.sql.window` 
namespace.

https://github.com/apache/spark/pull/39041 implemented the base, and 
https://github.com/apache/spark/pull/39149 implemented Spark Connect's Window.

This PR connects them.

### Why are the changes needed?

To provide the users the same usage, see also 
https://github.com/apache/spark/pull/39041.

### Does this PR introduce _any_ user-facing change?

Yes, see also https://github.com/apache/spark/pull/39041.
Spark Connect can use Window functions via the same namespace 
`pyspark.sql.window`.

### How was this patch tested?

Manually checked the related unittests.

Closes #39290 from HyukjinKwon/SPARK-41292.

Authored-by: Hyukjin Kwon 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/column.py |  2 --
 python/pyspark/sql/connect/window.py | 24 
 python/pyspark/sql/utils.py  | 25 ++---
 python/pyspark/sql/window.py | 20 +---
 4 files changed, 43 insertions(+), 28 deletions(-)

diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 5025fc8e197..206d30b15d8 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -462,8 +462,6 @@ def _test() -> None:
 # TODO(SPARK-41771): __getitem__ does not work with Column.isin
 del pyspark.sql.connect.column.Column.getField.__doc__
 del pyspark.sql.connect.column.Column.getItem.__doc__
-# TODO(SPARK-41758): Support Window functions
-del pyspark.sql.connect.column.Column.over.__doc__
 
 (failure_count, test_count) = doctest.testmod(
 pyspark.sql.connect.column,
diff --git a/python/pyspark/sql/connect/window.py 
b/python/pyspark/sql/connect/window.py
index c54157d0dbb..24b057022bf 100644
--- a/python/pyspark/sql/connect/window.py
+++ b/python/pyspark/sql/connect/window.py
@@ -113,8 +113,6 @@ class WindowSpec:
 frame=self._frame,
 )
 
-partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__
-
 def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> 
"WindowSpec":
 _cols: List[ColumnOrName] = []
 for col in cols:
@@ -149,8 +147,6 @@ class WindowSpec:
 frame=self._frame,
 )
 
-orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__
-
 def rowsBetween(self, start: int, end: int) -> "WindowSpec":
 if not isinstance(start, int):
 raise TypeError(f"start must be a int, but got 
{type(start).__name__}")
@@ -168,8 +164,6 @@ class WindowSpec:
 frame=WindowFrame(isRowFrame=True, start=start, end=end),
 )
 
-rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__
-
 def rangeBetween(self, start: int, end: int) -> "WindowSpec":
 if not isinstance(start, int):
 raise TypeError(f"start must be a int, but got 
{type(start).__name__}")
@@ -187,8 +181,6 @@ class WindowSpec:
 frame=WindowFrame(isRowFrame=False, start=start, end=end),
 )
 
-rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__
-
 def __repr__(self) -> str:
 strs: List[str] = []
 if len(self._partitionSpec) > 0:
@@ -202,6 +194,10 @@ class WindowSpec:
 return "WindowSpec(" + ", ".join(strs) + ")"
 
 
+WindowSpec.rangeBetween.__doc__ = PySparkWindowSpec.rangeBetween.__doc__
+WindowSpec.rowsBetween.__doc__ = PySparkWindowSpec.rowsBetween.__doc__
+WindowSpec.orderBy.__doc__ = PySparkWindowSpec.orderBy.__doc__
+WindowSpec.partitionBy.__doc__ = PySparkWindowSpec.partitionBy.__doc__
 WindowSpec.__doc__ = PySparkWindowSpec.__doc__
 
 
@@ -221,27 +217,23 @@ class Window:
 def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> 
"WindowSpec":
 return Window._spec.partitionBy(*cols)
 
-partitionBy.__doc__ = PySparkWindow.partitionBy.__doc__
-
 @staticmethod
 def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> 
"WindowSpec":
 return Window._spec.orderBy(*cols)
 
-orderBy.__doc__ = PySparkWindow.orderBy.__doc__
-
 @staticmethod
 def rowsBetween(start: int, end: int) -> "WindowSpec":
 return Window._spec.rowsBetween(start, end)
 
-

[spark] branch master updated: [SPARK-41742] Support df.groupBy().agg({"*":"count"})

2022-12-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2d028a2ec19 [SPARK-41742] Support df.groupBy().agg({"*":"count"})
2d028a2ec19 is described below

commit 2d028a2ec19f1a9e41e3b2e893c412bd28ab53a4
Author: Martin Grund 
AuthorDate: Fri Dec 30 10:22:00 2022 +0800

[SPARK-41742] Support df.groupBy().agg({"*":"count"})

### What changes were proposed in this pull request?
Compatibility changes to support `count(*)` for DF operations that are 
rewritten into `count(1)`.

### Why are the changes needed?
Compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #39298 from grundprinzip/SPARK-41742.

Authored-by: Martin Grund 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/group.py | 8 +++-
 python/pyspark/sql/group.py | 4 +---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/connect/group.py 
b/python/pyspark/sql/connect/group.py
index 4c074d6da1b..fd6f9816e2d 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -80,8 +80,14 @@ class GroupedData:
 
 assert exprs, "exprs should not be empty"
 if len(exprs) == 1 and isinstance(exprs[0], dict):
+# There is a special case for count(*) which is rewritten into 
count(1).
 # Convert the dict into key value pairs
-aggregate_cols = [scalar_function(exprs[0][k], col(k)) for k in 
exprs[0]]
+aggregate_cols = [
+scalar_function(
+exprs[0][k], lit(1) if exprs[0][k] == "count" and k == "*" 
else col(k)
+)
+for k in exprs[0]
+]
 else:
 # Columns
 assert all(isinstance(c, Column) for c in exprs), "all exprs 
should be Column"
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index ac661e39741..10468988186 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -78,8 +78,6 @@ class GroupedData(PandasGroupedOpsMixin):
 def agg(self, __exprs: Dict[str, str]) -> DataFrame:
 ...
 
-# TODO(SPARK-41279): Enable the doctest with supporting the star in Spark 
Connect.
-# TODO(SPARK-41743): groupBy(...).agg(...).sort does not actually sort the 
output
 def agg(self, *exprs: Union[Column, Dict[str, str]]) -> DataFrame:
 """Compute aggregates and returns the result as a :class:`DataFrame`.
 
@@ -135,7 +133,7 @@ class GroupedData(PandasGroupedOpsMixin):
 
 Group-by name, and count each group.
 
->>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()  # 
doctest: +SKIP
+>>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
 +-++
 | name|count(1)|
 +-++


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (cfdbfb7349a -> ccbd9a7b98d)

2022-12-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from cfdbfb7349a [SPARK-41726][SQL] Remove 
`OptimizedCreateHiveTableAsSelectCommand`
 add ccbd9a7b98d [SPARK-41778][SQL] Add an alias "reduce" to ArrayAggregate

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../expressions/higherOrderFunctions.scala |  66 +---
 .../sql-functions/sql-expression-schema.md |   1 +
 .../sql-tests/inputs/higher-order-functions.sql|   6 +
 .../results/ansi/higher-order-functions.sql.out|  38 +
 .../results/higher-order-functions.sql.out |  38 +
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 184 +++--
 7 files changed, 227 insertions(+), 107 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand`

2022-12-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cfdbfb7349a [SPARK-41726][SQL] Remove 
`OptimizedCreateHiveTableAsSelectCommand`
cfdbfb7349a is described below

commit cfdbfb7349a6c7765b0172c23f133d39196354b0
Author: ulysses-you 
AuthorDate: Thu Dec 29 17:02:00 2022 -0800

[SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand`

### What changes were proposed in this pull request?

This pr removes `OptimizedCreateHiveTableAsSelectCommand` and move the code 
that tune `InsertIntoHiveTable` to `InsertIntoHadoopFsRelationCommand` into 
`RelationConversions`.

### Why are the changes needed?

CTAS use a nested execution to do data writing, so it is unnecessary to 
have `OptimizedCreateHiveTableAsSelectCommand`. The inside 
`InsertIntoHiveTable` would be converted to `InsertIntoHadoopFsRelationCommand` 
if possible.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

fix test

Closes #39263 from ulysses-you/SPARK-41726.

Authored-by: ulysses-you 
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/HiveStrategies.scala |  32 +-
 .../execution/CreateHiveTableAsSelectCommand.scala | 114 -
 .../sql/hive/execution/HiveExplainSuite.scala  |  24 -
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |  98 ++
 4 files changed, 104 insertions(+), 164 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 42bf1e31bb0..af727f966e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, 
InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, 
InsertIntoDataSourceDirCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceStrategy}
+import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, 
LogicalRelation}
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
 import org.apache.spark.sql.internal.HiveSerDe
@@ -232,15 +233,36 @@ case class RelationConversions(
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
 metastoreCatalog.convert(relation, isWrite = false)
 
-  // CTAS
-  case CreateTable(tableDesc, mode, Some(query))
+  // CTAS path
+  // This `InsertIntoHiveTable` is derived from 
`CreateHiveTableAsSelectCommand`,
+  // that only matches table insertion inside Hive CTAS.
+  // This pattern would not cause conflicts because this rule is always 
applied before
+  // `HiveAnalysis` and both of these rules are running once.
+  case InsertIntoHiveTable(tableDesc, _, query, overwrite, 
ifPartitionNotExists, _)
   if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
 tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) 
&&
 conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
 // validation is required to be done here before relation conversion.
 DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
-OptimizedCreateHiveTableAsSelectCommand(
-  tableDesc, query, query.output.map(_.name), mode)
+val hiveTable = DDLUtils.readHiveTable(tableDesc)
+val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = 
true) match {
+  case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
+  case _ => throw 
QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
+tableDesc.identifier)
+}
+InsertIntoHadoopFsRelationCommand(
+  hadoopRelation.location.rootPaths.head,
+  Map.empty, // We don't support to convert partitioned table.
+  ifPartitionNotExists,
+  Seq.empty, // We don't support to convert partitioned table.
+  hadoopRelation.bucketSpec,
+  hadoopRelation.fileFormat,
+  hadoopRelation.options,
+  query,
+  if (overwrite) SaveMode.Overwrite else SaveMode.Append,
+  Some(tableDesc),
+  

[spark] branch master updated: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

2022-12-29 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bb18703fdbf [SPARK-41429][UI] Protobuf serializer for 
RDDOperationGraphWrapper
bb18703fdbf is described below

commit bb18703fdbfbe4f7887abebd75beb37af662d0f3
Author: Sandeep Singh 
AuthorDate: Thu Dec 29 16:14:07 2022 -0800

[SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

### What changes were proposed in this pull request?
Add Protobuf serializer for RDDOperationGraphWrapper

### Why are the changes needed?
Support fast and compact serialization/deserialization for 
RDDOperationGraphWrapper over RocksDB.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT

Closes #39110 from techaddict/SPARK-41429-RDDOperationGraphWrapper.

Authored-by: Sandeep Singh 
Signed-off-by: Gengliang Wang 
---
 .../apache/spark/status/protobuf/store_types.proto |  35 ++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../RDDOperationGraphWrapperSerializer.scala   | 120 +
 .../protobuf/KVStoreProtobufSerializerSuite.scala  |  84 +++
 4 files changed, 240 insertions(+)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 22e22eea1a2..e9150490746 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -421,3 +421,38 @@ message SparkPlanGraphWrapper {
   repeated SparkPlanGraphNodeWrapper nodes = 2;
   repeated SparkPlanGraphEdge edges = 3;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+message RDDOperationNode {
+  enum DeterministicLevel {
+UNSPECIFIED = 0;
+DETERMINATE = 1;
+UNORDERED = 2;
+INDETERMINATE = 3;
+  }
+  int32 id = 1;
+  string name = 2;
+  bool cached = 3;
+  bool barrier = 4;
+  string callsite = 5;
+  DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+  string id = 1;
+  string name = 2;
+  repeated RDDOperationNode child_nodes = 3;
+  repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+  int64 stage_id = 1;
+  repeated RDDOperationEdge edges = 2;
+  repeated RDDOperationEdge outgoing_edges = 3;
+  repeated RDDOperationEdge incoming_edges = 4;
+  RDDOperationClusterWrapper root_cluster = 5;
+}
diff --git 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 39127e6a28c..4e39d9ecdc0 100644
--- 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -27,3 +27,4 @@ 
org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer
 org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
 org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
 org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
+org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
new file mode 100644
index 000..8975062082c
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, 
RDDOperationGraphWrapper}
+import 

[spark] branch master updated: [SQL][MINOR] Use Diamond operator for constructing HashMap

2022-12-29 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 18488158bee [SQL][MINOR] Use Diamond operator for constructing HashMap
18488158bee is described below

commit 18488158beee5435f99899f99b2e90fb6e37f3d5
Author: Ted Yu 
AuthorDate: Thu Dec 29 08:07:50 2022 -0600

[SQL][MINOR] Use Diamond operator for constructing HashMap

### What changes were proposed in this pull request?
This PR uses Diamond operator for constructing HashMap and Tuple2 for type 
inference.

### Why are the changes needed?
The change follows Java practices for creating new HashMap.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing test suite.

Closes #39250 from tedyu/hash-map.

Authored-by: Ted Yu 
Signed-off-by: Sean Owen 
---
 .../test/org/apache/spark/sql/JavaBeanDeserializationSuite.java | 6 +++---
 .../java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java   | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
index da626b4d873..66c985bdda0 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
@@ -590,9 +590,9 @@ public class JavaBeanDeserializationSuite implements 
Serializable {
 .reduceGroups(rf);
 
 List> expectedRecords = Arrays.asList(
-new Tuple2("a", new Item("a", 8)),
-new Tuple2("b", new Item("b", 3)),
-new Tuple2("c", new Item("c", 2)));
+new Tuple2<>("a", new Item("a", 8)),
+new Tuple2<>("b", new Item("b", 3)),
+new Tuple2<>("c", new Item("c", 2)));
 
 List> result = finalDs.collectAsList();
 
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java
index 1836cc403c3..bee77616b7e 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaColumnExpressionSuite.java
@@ -86,7 +86,7 @@ public class JavaColumnExpressionSuite {
 AnalysisException e = Assert.assertThrows(AnalysisException.class,
   () -> df.filter(df.col("a").isInCollection(Arrays.asList(new 
Column("b");
 
Assert.assertTrue(e.getErrorClass().equals("DATATYPE_MISMATCH.DATA_DIFF_TYPES"));
-Map messageParameters = new HashMap();
+Map messageParameters = new HashMap<>();
 messageParameters.put("functionName", "`in`");
 messageParameters.put("dataType", "[\"INT\", \"ARRAY\"]");
 messageParameters.put("sqlExpr", "\"(a IN (b))\"");


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (908218ee149 -> a4a800727fc)

2022-12-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 908218ee149 [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the 
doctests for dropFields and withField
 add a4a800727fc [SPARK-41655][CONNECT][DOCS][FOLLOW-UP] Update related 
JIRAs for skipped tests

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/column.py | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the doctests for dropFields and withField

2022-12-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 908218ee149 [SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the 
doctests for dropFields and withField
908218ee149 is described below

commit 908218ee149b9faf1738a3389781e7a2c825b8a8
Author: Hyukjin Kwon 
AuthorDate: Thu Dec 29 21:41:07 2022 +0900

[SPARK-41767][CONNECT][TESTS][FOLLOW-UP] Disable the doctests for 
dropFields and withField

### What changes were proposed in this pull request?

There is a logical conflict between 
https://github.com/apache/spark/pull/39249 and 
https://github.com/apache/spark/pull/39283. This PR fixes it with filing a 
related JIRA.

### Why are the changes needed?

To recover the build.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually tested via:

```bash
./python/run-tests --testnames 'pyspark.sql.connect.column'
```

Closes #39288 from HyukjinKwon/SPARK-41767.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/column.py | 4 
 1 file changed, 4 insertions(+)

diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 2667e795974..39de3cdd126 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -443,6 +443,10 @@ def _test() -> None:
 os.environ["SPARK_REMOTE"] = "sc://localhost"
 globs["spark"] = 
PySparkSession.builder.remote("sc://localhost").getOrCreate()
 
+# TODO(SPARK-41746): SparkSession.createDataFrame does not support 
nested datatypes
+del pyspark.sql.connect.column.Column.dropFields.__doc__
+# TODO(SPARK-41772): Enable 
pyspark.sql.connect.column.Column.withField doctest
+del pyspark.sql.connect.column.Column.withField.__doc__
 # TODO(SPARK-41751): Support 
Column.bitwiseAND,bitwiseOR,bitwiseXOR,eqNullSafe,isNotNull,
 # isNull,isin
 del pyspark.sql.connect.column.Column.bitwiseAND.__doc__


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41767][CONNECT][PYTHON] Implement `Column.{withField, dropFields}`

2022-12-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 86f6dde3079 [SPARK-41767][CONNECT][PYTHON] Implement 
`Column.{withField, dropFields}`
86f6dde3079 is described below

commit 86f6dde30798e69c7a953ee59788a4a9831b37cd
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 29 20:57:01 2022 +0900

[SPARK-41767][CONNECT][PYTHON] Implement `Column.{withField, dropFields}`

### What changes were proposed in this pull request?
Implement `Column.{withField, dropFields}`

### Why are the changes needed?
For API coverage

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added UT

Closes #39283 from zhengruifeng/connect_column_field.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../main/protobuf/spark/connect/expressions.proto  |  15 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  17 +++
 python/pyspark/sql/column.py   |   6 +
 python/pyspark/sql/connect/column.py   |  37 +-
 python/pyspark/sql/connect/expressions.py  |  53 
 .../pyspark/sql/connect/proto/expressions_pb2.py   |  93 +++--
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  53 
 .../sql/tests/connect/test_connect_column.py   | 147 +++--
 8 files changed, 366 insertions(+), 55 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
index b8ed9eb6f23..fa2836702c6 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
@@ -41,6 +41,7 @@ message Expression {
 LambdaFunction lambda_function = 10;
 Window window = 11;
 UnresolvedExtractValue unresolved_extract_value = 12;
+UpdateFields update_fields = 13;
   }
 
 
@@ -241,6 +242,20 @@ message Expression {
 Expression extraction = 2;
   }
 
+  // Add, replace or drop a field of `StructType` expression by name.
+  message UpdateFields {
+// (Required) The struct expression.
+Expression struct_expression = 1;
+
+// (Required) The field name.
+string field_name = 2;
+
+// (Optional) The expression to add or replace.
+//
+// When not set, it means this field will be dropped.
+Expression value_expression = 3;
+  }
+
   message Alias {
 // (Required) The expression that alias will be added on.
 Expression expr = 1;
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4bb90fc5bc0..d06787e6b14 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -596,6 +596,8 @@ class SparkConnectPlanner(session: SparkSession) {
 transformUnresolvedRegex(exp.getUnresolvedRegex)
   case proto.Expression.ExprTypeCase.UNRESOLVED_EXTRACT_VALUE =>
 transformUnresolvedExtractValue(exp.getUnresolvedExtractValue)
+  case proto.Expression.ExprTypeCase.UPDATE_FIELDS =>
+transformUpdateFields(exp.getUpdateFields)
   case proto.Expression.ExprTypeCase.SORT_ORDER => 
transformSortOrder(exp.getSortOrder)
   case proto.Expression.ExprTypeCase.LAMBDA_FUNCTION =>
 transformLambdaFunction(exp.getLambdaFunction)
@@ -860,6 +862,21 @@ class SparkConnectPlanner(session: SparkSession) {
   transformExpression(extract.getExtraction))
   }
 
+  private def transformUpdateFields(update: proto.Expression.UpdateFields): 
UpdateFields = {
+if (update.hasValueExpression) {
+  // add or replace a field
+  UpdateFields.apply(
+col = transformExpression(update.getStructExpression),
+fieldName = update.getFieldName,
+expr = transformExpression(update.getValueExpression))
+} else {
+  // drop a field
+  UpdateFields.apply(
+col = transformExpression(update.getStructExpression),
+fieldName = update.getFieldName)
+}
+  }
+
   private def transformWindowExpression(window: proto.Expression.Window) = {
 if (!window.hasWindowFunction) {
   throw InvalidPlanInput(s"WindowFunction is required in WindowExpression")
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 5a0987b4cfe..cd7b6932c2f 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -522,6 +522,9 @@ class Column:
 
 .. versionadded:: 3.1.0
 
+.. versionchanged:: 3.4.0

[spark] branch master updated: [SPARK-41655][CONNECT] Enable doctests in pyspark.sql.connect.column

2022-12-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 48c5843ad4b [SPARK-41655][CONNECT] Enable doctests in 
pyspark.sql.connect.column
48c5843ad4b is described below

commit 48c5843ad4b06e85e07d1db5b308a460209f6126
Author: Sandeep Singh 
AuthorDate: Thu Dec 29 20:52:35 2022 +0900

[SPARK-41655][CONNECT] Enable doctests in pyspark.sql.connect.column

### What changes were proposed in this pull request?
This PR proposes to enable doctests in pyspark.sql.connect.column that is 
virtually the same as pyspark.sql.column.

### Why are the changes needed?
To make sure on the PySpark compatibility and test coverage.

### Does this PR introduce any user-facing change?
No, doctest's only.

### How was this patch tested?
New Doctests Added

Closes #39249 from techaddict/SPARK-41655-pyspark.sql.connect.column.

Lead-authored-by: Sandeep Singh 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py  |  1 +
 python/pyspark/sql/column.py | 15 -
 python/pyspark/sql/connect/column.py | 60 
 3 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 558d058f3e5..df3a1f180fc 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -507,6 +507,7 @@ pyspark_connect = Module(
 "pyspark.sql.connect.catalog",
 "pyspark.sql.connect.group",
 "pyspark.sql.connect.window",
+"pyspark.sql.connect.column",
 # unittests
 "pyspark.sql.tests.connect.test_connect_column_expressions",
 "pyspark.sql.tests.connect.test_connect_plan_only",
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 3bc49ef8031..5a0987b4cfe 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -182,6 +182,9 @@ def _reverse_op(
 return _
 
 
+# TODO(SPARK-41757): Compatibility of string representation for Column
+
+
 class Column:
 
 """
@@ -200,17 +203,16 @@ class Column:
 ...  [(2, "Alice"), (5, "Bob")], ["age", "name"])
 
 Select a column out of a DataFrame
-
->>> df.name
+>>> df.name   # doctest: +SKIP
 Column<'name'>
->>> df["name"]
+>>> df["name"]  # doctest: +SKIP
 Column<'name'>
 
 Create from an expression
 
->>> df.age + 1
+>>> df.age + 1  # doctest: +SKIP
 Column<'(age + 1)'>
->>> 1 / df.age
+>>> 1 / df.age  # doctest: +SKIP
 Column<'(1 / age)'>
 """
 
@@ -1258,8 +1260,7 @@ class Column:
 >>> from pyspark.sql import Window
 >>> window = Window.partitionBy("name").orderBy("age") \
 .rowsBetween(Window.unboundedPreceding, Window.currentRow)
->>> from pyspark.sql.functions import rank, min
->>> from pyspark.sql.functions import desc
+>>> from pyspark.sql.functions import rank, min, desc
 >>> df = spark.createDataFrame(
 ...  [(2, "Alice"), (5, "Bob")], ["age", "name"])
 >>> df.withColumn("rank", rank().over(window)) \
diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index b873a757e41..58d86a3d389 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -28,6 +28,7 @@ from typing import (
 Optional,
 )
 
+from pyspark import SparkContext, SparkConf
 from pyspark.sql.types import DataType
 from pyspark.sql.column import Column as PySparkColumn
 
@@ -390,3 +391,62 @@ class Column:
 
 
 Column.__doc__ = PySparkColumn.__doc__
+
+
+def _test() -> None:
+import os
+import sys
+import doctest
+from pyspark.sql import SparkSession as PySparkSession
+from pyspark.testing.connectutils import should_test_connect, 
connect_requirement_message
+
+os.chdir(os.environ["SPARK_HOME"])
+
+if should_test_connect:
+import pyspark.sql.connect.column
+
+globs = pyspark.sql.connect.column.__dict__.copy()
+# Works around to create a regular Spark session
+sc = SparkContext("local[4]", "sql.connect.column tests", 
conf=SparkConf())
+globs["_spark"] = PySparkSession(sc, options={"spark.app.name": 
"sql.connect.column tests"})
+
+# Creates a remote Spark session.
+os.environ["SPARK_REMOTE"] = "sc://localhost"
+globs["spark"] = 
PySparkSession.builder.remote("sc://localhost").getOrCreate()
+
+# TODO(SPARK-41751): Support 
Column.bitwiseAND,bitwiseOR,bitwiseXOR,eqNullSafe,isNotNull,
+# isNull,isin
+del pyspark.sql.connect.column.Column.bitwiseAND.__doc__
+del pyspark.sql.connect.column.Column.bitwiseOR.__doc__
+del 

[spark] branch master updated: [SPARK-41344][SQL] Make error clearer when table not found in SupportsCatalogOptions catalog

2022-12-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b331ad0ae2b [SPARK-41344][SQL] Make error clearer when table not found 
in SupportsCatalogOptions catalog
b331ad0ae2b is described below

commit b331ad0ae2b6ab566aea2ddbdbddcd3d28f8eaa1
Author: Zhen Wang <643348...@qq.com>
AuthorDate: Thu Dec 29 20:25:17 2022 +0900

[SPARK-41344][SQL] Make error clearer when table not found in 
SupportsCatalogOptions catalog

### What changes were proposed in this pull request?

Make error clearer when table not found in SupportsCatalogOptions catalog.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #38871 from wForget/SPARK-41344.

Lead-authored-by: Zhen Wang <643348...@qq.com>
Co-authored-by: wForget <643348...@qq.com>
Signed-off-by: Hyukjin Kwon 
---
 .../sql/connector/catalog/CatalogV2Util.scala  | 27 ++
 .../datasources/v2/DataSourceV2Utils.scala |  2 +-
 .../connector/SupportsCatalogOptionsSuite.scala|  9 +++-
 3 files changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index abd43065048..72c557c8d77 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -330,22 +330,29 @@ private[sql] object CatalogV2Util {
   ident: Identifier,
   timeTravelSpec: Option[TimeTravelSpec] = None): Option[Table] =
 try {
-  if (timeTravelSpec.nonEmpty) {
-timeTravelSpec.get match {
-  case v: AsOfVersion =>
-Option(catalog.asTableCatalog.loadTable(ident, v.version))
-  case ts: AsOfTimestamp =>
-Option(catalog.asTableCatalog.loadTable(ident, ts.timestamp))
-}
-  } else {
-Option(catalog.asTableCatalog.loadTable(ident))
-  }
+  Option(getTable(catalog, ident, timeTravelSpec))
 } catch {
   case _: NoSuchTableException => None
   case _: NoSuchDatabaseException => None
   case _: NoSuchNamespaceException => None
 }
 
+  def getTable(
+  catalog: CatalogPlugin,
+  ident: Identifier,
+  timeTravelSpec: Option[TimeTravelSpec] = None): Table = {
+if (timeTravelSpec.nonEmpty) {
+  timeTravelSpec.get match {
+case v: AsOfVersion =>
+  catalog.asTableCatalog.loadTable(ident, v.version)
+case ts: AsOfTimestamp =>
+  catalog.asTableCatalog.loadTable(ident, ts.timestamp)
+  }
+} else {
+  catalog.asTableCatalog.loadTable(ident)
+}
+  }
+
   def loadFunction(catalog: CatalogPlugin, ident: Identifier): 
Option[UnboundFunction] = {
 try {
   Option(catalog.asFunctionCatalog.loadFunction(ident))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index f1d1cc5a173..c906e42a9b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -134,7 +134,7 @@ private[sql] object DataSourceV2Utils extends Logging {
   None
 }
 val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, 
timeTravelVersion, conf)
-(CatalogV2Util.loadTable(catalog, ident, timeTravel).get, 
Some(catalog), Some(ident))
+(CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog), 
Some(ident))
   case _ =>
 // TODO: Non-catalog paths for DSV2 are currently not well defined.
 val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, 
userSpecifiedSchema)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index f8278d18b0a..fd4f719417e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}

[spark] branch master updated (febe5418f49 -> 054a7845497)

2022-12-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from febe5418f49 [SPARK-41761][CONNECT][PYTHON] Fix arithmetic ops: 
`__neg__`, `__pow__`, `__rpow__`
 add 054a7845497 [SPARK-41764][CONNECT][PYTHON] Make the internal string op 
name consistent with FunctionRegistry

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/column.py   |  4 +-
 .../sql/tests/connect/test_connect_column.py   | 46 ++
 2 files changed, 48 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4c7f8106bf1 -> febe5418f49)

2022-12-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4c7f8106bf1 [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, 
isNotNull, eqNullSafe}`
 add febe5418f49 [SPARK-41761][CONNECT][PYTHON] Fix arithmetic ops: 
`__neg__`, `__pow__`, `__rpow__`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/column.py   |  6 +--
 .../sql/tests/connect/test_connect_column.py   | 50 ++
 2 files changed, 53 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, isNotNull, eqNullSafe}`

2022-12-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4c7f8106bf1 [SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, 
isNotNull, eqNullSafe}`
4c7f8106bf1 is described below

commit 4c7f8106bf145203d0b1aed5f6d5762e915c83ca
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 29 16:39:10 2022 +0800

[SPARK-41751][CONNECT][PYTHON] Fix `Column.{isNull, isNotNull, eqNullSafe}`

### What changes were proposed in this pull request?
Fix `Column.{isNull, isNotNull, eqNullSafe}`

### Why are the changes needed?
they were wrongly implemented

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
added UT

Closes #39273 from zhengruifeng/connect_column_fix_null.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/column.py   |  6 ++--
 .../sql/tests/connect/test_connect_column.py   | 40 ++
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 30916ecafbf..5248c1c0ab5 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -124,7 +124,7 @@ class Column:
 __ge__ = _bin_op(">=")
 __le__ = _bin_op("<=")
 
-eqNullSafe = _bin_op("eqNullSafe", PySparkColumn.eqNullSafe.__doc__)
+eqNullSafe = _bin_op("<=>", PySparkColumn.eqNullSafe.__doc__)
 
 __neg__ = _func_op("negate")
 
@@ -148,8 +148,8 @@ class Column:
 bitwiseAND = _bin_op("&", PySparkColumn.bitwiseAND.__doc__)
 bitwiseXOR = _bin_op("^", PySparkColumn.bitwiseXOR.__doc__)
 
-isNull = _unary_op("isNull", PySparkColumn.isNull.__doc__)
-isNotNull = _unary_op("isNotNull", PySparkColumn.isNotNull.__doc__)
+isNull = _unary_op("isnull", PySparkColumn.isNull.__doc__)
+isNotNull = _unary_op("isnotnull", PySparkColumn.isNotNull.__doc__)
 
 def __ne__(  # type: ignore[override]
 self,
diff --git a/python/pyspark/sql/tests/connect/test_connect_column.py 
b/python/pyspark/sql/tests/connect/test_connect_column.py
index 7eb30505504..e34f27aee98 100644
--- a/python/pyspark/sql/tests/connect/test_connect_column.py
+++ b/python/pyspark/sql/tests/connect/test_connect_column.py
@@ -112,6 +112,46 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 df4.filter(df4.name.isNotNull()).toPandas(),
 )
 
+def test_column_with_null(self):
+# SPARK-41751: test isNull, isNotNull, eqNullSafe
+from pyspark.sql import functions as SF
+from pyspark.sql.connect import functions as CF
+
+query = """
+SELECT * FROM VALUES
+(1, 1, NULL), (2, NULL, NULL), (3, 3, 1)
+AS tab(a, b, c)
+"""
+
+# +---+++
+# |  a|   b|   c|
+# +---+++
+# |  1|   1|null|
+# |  2|null|null|
+# |  3|   3|   1|
+# +---+++
+
+cdf = self.connect.sql(query)
+sdf = self.spark.sql(query)
+
+# test isNull
+self.assert_eq(
+cdf.select(cdf.a.isNull(), cdf["b"].isNull(), 
CF.col("c").isNull()).toPandas(),
+sdf.select(sdf.a.isNull(), sdf["b"].isNull(), 
SF.col("c").isNull()).toPandas(),
+)
+
+# test isNotNull
+self.assert_eq(
+cdf.select(cdf.a.isNotNull(), cdf["b"].isNotNull(), 
CF.col("c").isNotNull()).toPandas(),
+sdf.select(sdf.a.isNotNull(), sdf["b"].isNotNull(), 
SF.col("c").isNotNull()).toPandas(),
+)
+
+# test eqNullSafe
+self.assert_eq(
+cdf.select(cdf.a.eqNullSafe(cdf.b), 
cdf["b"].eqNullSafe(CF.col("c"))).toPandas(),
+sdf.select(sdf.a.eqNullSafe(sdf.b), 
sdf["b"].eqNullSafe(SF.col("c"))).toPandas(),
+)
+
 def test_invalid_ops(self):
 query = """
 SELECT * FROM VALUES


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org