[spark] branch master updated (b5a9e1fee37 -> 1979169052e)

2022-12-08 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b5a9e1fee37 [SPARK-41454][PYTHON] Support Python 3.11
 add 1979169052e Revert "[SPARK-41369][CONNECT] Remove unneeded connect 
server deps"

No new revisions were added by this update.

Summary of changes:
 connector/connect/server/pom.xml | 67 
 1 file changed, 67 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41454][PYTHON] Support Python 3.11

2022-12-08 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b5a9e1fee37 [SPARK-41454][PYTHON] Support Python 3.11
b5a9e1fee37 is described below

commit b5a9e1fee374b5d1e86021c7b49f18c5bb1b986e
Author: Dongjoon Hyun 
AuthorDate: Thu Dec 8 16:32:37 2022 -0800

[SPARK-41454][PYTHON] Support Python 3.11

### What changes were proposed in this pull request?

This PR aims to support Python 3.11.

### Why are the changes needed?

Python 3.11 is the newest major release of the Python programming language, 
and it contains many new features and optimizations and Python 3.11.1 is the 
latest version.

- 2022-12-03 https://www.python.org/downloads/release/python-3111/

And, Spark is affected by one API removal (deprecated at 3.9 and removed at 
3.11). Since this is handled by conditionally, there is no regression at the 
old Python versions.
- https://bugs.python.org/issue40465

### Does this PR introduce _any_ user-facing change?

No, previsouly, this is not supported.

### How was this patch tested?

Manually run the following. Note that this is tested without optional 
dependencies.
```
$ python/run-tests.py --python-executables python3.11
Will test against the following Python executables: ['python3.11']
Will test the following Python modules: ['pyspark-connect', 'pyspark-core', 
'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 
'pyspark-resource', 'pyspark-sql', 'pyspark-streaming']
python3.11 python_implementation is CPython
python3.11 version is: Python 3.11.1
Starting test(python3.11): pyspark.ml.tests.test_evaluation (temp output: 
/Users/dongjoon/APACHE/spark-merge/python/target/ff09022a-f3d3-413b-b15d-261c40d5b048/python3.11__pyspark.ml.tests.test_evaluation__wh9c4y5l.log)
...
Finished test(python3.11): pyspark.sql.streaming.readwriter (88s)
Tests passed in 1138 seconds

...
Skipped tests in pyspark.tests.test_worker with python3.11:
test_memory_limit 
(pyspark.tests.test_worker.WorkerMemoryTest.test_memory_limit) ... skipped 
"Memory limit feature in Python worker is dependent on Python's 'resource' 
module on Linux; however, not found or not on Linux."
```

Closes #38987 from dongjoon-hyun/SPARK-41454.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/shuffle.py | 9 ++---
 python/setup.py   | 1 +
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 35c3397de50..da03110c321 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -78,9 +78,12 @@ def _get_local_dirs(sub):
 path = os.environ.get("SPARK_LOCAL_DIRS", "/tmp")
 dirs = path.split(",")
 if len(dirs) > 1:
-# different order in different processes and instances
-rnd = random.Random(os.getpid() + id(dirs))
-random.shuffle(dirs, rnd.random)
+if sys.version_info < (3, 11):
+# different order in different processes and instances
+rnd = random.Random(os.getpid() + id(dirs))
+random.shuffle(dirs, rnd.random)
+else:
+random.shuffle(dirs)
 return [os.path.join(d, "python", str(os.getpid()), sub) for d in dirs]
 
 
diff --git a/python/setup.py b/python/setup.py
index af102f23083..65db3912efe 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -282,6 +282,7 @@ try:
 'Programming Language :: Python :: 3.8',
 'Programming Language :: Python :: 3.9',
 'Programming Language :: Python :: 3.10',
+'Programming Language :: Python :: 3.11',
 'Programming Language :: Python :: Implementation :: CPython',
 'Programming Language :: Python :: Implementation :: PyPy',
 'Typing :: Typed'],


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41452][SQL] `to_char` should return null when format is null

2022-12-08 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3433f2a77d3 [SPARK-41452][SQL] `to_char` should return null when 
format is null
3433f2a77d3 is described below

commit 3433f2a77d3dd665f42aa3d558152cf4c912c54c
Author: Bruce Robbins 
AuthorDate: Thu Dec 8 16:14:43 2022 -0800

[SPARK-41452][SQL] `to_char` should return null when format is null

### What changes were proposed in this pull request?

When a user specifies a null format in `to_char`, return null instead of 
throwing a `NullPointerException`.

### Why are the changes needed?

`to_char` currently throws a `NullPointerException` when the format is null:
```
spark-sql> select to_char(454, null);
[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal 
error. You hit a bug in Spark or the Spark plugins you use. Please, report this 
bug to the corresponding communities or vendors, and provide the full stack 
trace.
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase 
analysis failed with an internal error. You hit a bug in Spark or the Spark 
plugins you use. Please, report this bug to the corresponding communities or 
vendors, and provide the full stack trace.
...
Caused by: java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.ToCharacter.numberFormat$lzycompute(numberFormatExpressions.scala:227)
at 
org.apache.spark.sql.catalyst.expressions.ToCharacter.numberFormat(numberFormatExpressions.scala:227)
at 
org.apache.spark.sql.catalyst.expressions.ToCharacter.numberFormatter$lzycompute(numberFormatExpressions.scala:228)
at 
org.apache.spark.sql.catalyst.expressions.ToCharacter.numberFormatter(numberFormatExpressions.scala:228)
at 
org.apache.spark.sql.catalyst.expressions.ToCharacter.checkInputDataTypes(numberFormatExpressions.scala:236)
```
Compare to `to_binary`:
```
spark-sql> SELECT to_binary('abc', null);
NULL
Time taken: 3.097 seconds, Fetched 1 row(s)
spark-sql>
```
Also compare to `to_char` in PostgreSQL 14.6:
```
select to_char(454, null) is null as to_char_is_null;

 to_char_is_null
-
 t
(1 row)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

Closes #38986 from bersprockets/to_char_issue.

Authored-by: Bruce Robbins 
Signed-off-by: Dongjoon Hyun 
---
 .../expressions/numberFormatExpressions.scala| 20 ++--
 .../expressions/StringExpressionsSuite.scala |  7 +++
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
index f5f86bfac19..2d4f0438db7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
@@ -224,17 +224,21 @@ case class TryToNumber(left: Expression, right: 
Expression)
   group = "string_funcs")
 case class ToCharacter(left: Expression, right: Expression)
   extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {
-  private lazy val numberFormat = 
right.eval().toString.toUpperCase(Locale.ROOT)
-  private lazy val numberFormatter = new ToNumberParser(numberFormat, true)
+  private lazy val numberFormatter = {
+val value = right.eval()
+if (value != null) {
+  new ToNumberParser(value.toString.toUpperCase(Locale.ROOT), true)
+} else {
+  null
+}
+  }
 
   override def dataType: DataType = StringType
   override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType, StringType)
   override def checkInputDataTypes(): TypeCheckResult = {
 val inputTypeCheck = super.checkInputDataTypes()
 if (inputTypeCheck.isSuccess) {
-  if (right.foldable) {
-numberFormatter.checkInputDataTypes()
-  } else {
+  if (!right.foldable) {
 DataTypeMismatch(
   errorSubClass = "NON_FOLDABLE_INPUT",
   messageParameters = Map(
@@ -243,6 +247,10 @@ case class ToCharacter(left: Expression, right: Expression)
 "inputExpr" -> toSQLExpr(right)
   )
 )
+  } else if (numberFormatter == null) {
+TypeCheckResult.TypeCheckSuccess
+  } else {
+numberFormatter.checkInputDataTypes()
   }
 } else {
   inputTypeCheck
@@ -260,7 +268,7 @@ case class ToCharacter(left: Expression, right: Expression)
 val result =
   code"""
  

[spark] branch branch-3.3 updated (02f32ee358c -> 1f074ff1560)

2022-12-08 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from 02f32ee358c [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
 add 1f074ff1560 [SPARK-41376][CORE][3.3] Correct the Netty 
preferDirectBufs check logic on executor start

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/spark/network/util/NettyUtils.java | 14 ++
 .../spark/executor/CoarseGrainedExecutorBackend.scala  |  5 -
 2 files changed, 18 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41008][MLLIB] Dedup isotonic regression duplicate features

2022-12-08 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d05c7e037e [SPARK-41008][MLLIB] Dedup isotonic regression duplicate 
features
3d05c7e037e is described below

commit 3d05c7e037eff79de8ef9f6231aca8340bcc65ef
Author: Ahmed Mahran 
AuthorDate: Thu Dec 8 08:28:48 2022 -0600

[SPARK-41008][MLLIB] Dedup isotonic regression duplicate features

### What changes were proposed in this pull request?

Adding a pre-processing step to isotonic regression in mllib to handle 
duplicate features. This is to match `sklearn` implementation. Input points of 
duplicate feature values are aggregated into a single point using as label the 
weighted average of the labels of the points with duplicate feature values. All 
points for a unique feature values are aggregated as:
 - Aggregated label is the weighted average of all labels
 - Aggregated feature is the weighted average of all equal features. It is 
possible that feature values to be equal up to a resolution due to 
representation errors, since we cannot know which feature value to use in that 
case, we compute the weighted average of the features. Ideally, all feature 
values will be equal and the weighted average is just the value at any point.
 - Aggregated weight is the sum of all weights

### Why are the changes needed?

As per discussion on ticket 
[[SPARK-41008]](https://issues.apache.org/jira/browse/SPARK-41008), it is a bug 
and results should match `sklearn`.

### Does this PR introduce _any_ user-facing change?

There are no changes to the API, documentation or error messages. However, 
the user should expect results to change.

### How was this patch tested?

Existing test cases for duplicate features failed. These tests were 
adjusted accordingly. Also, new tests are added.

Here is a python snippet that can be used to verify the results:

```python
from sklearn.isotonic import IsotonicRegression

def test(x, y, x_test, isotonic=True):
ir = IsotonicRegression(out_of_bounds='clip', 
increasing=isotonic).fit(x, y)
y_test = ir.predict(x_test)

def print_array(label, a):
print(f"{label}: [{', '.join([str(i) for i in a])}]")

print_array("boundaries", ir.X_thresholds_)
print_array("predictions", ir.y_thresholds_)
print_array("y_test", y_test)

test(
x = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20],
y = [1, 0, 0, 1, 0, 1, 0, 0, 0],
x_test = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20]
)
```

srowen zapletal-martin

Closes #38966 from ahmed-mahran/ml-isotonic-reg-dups.

Authored-by: Ahmed Mahran 
Signed-off-by: Sean Owen 
---
 .../mllib/regression/IsotonicRegression.scala  | 141 +---
 .../mllib/regression/IsotonicRegressionSuite.scala | 180 -
 2 files changed, 262 insertions(+), 59 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 649f9816e6a..0b2bf147501 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.spark.mllib.regression
 
 import java.io.Serializable
@@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.commons.math3.util.Precision
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
@@ -307,6 +307,65 @@ class IsotonicRegression private (private var isotonic: 
Boolean) extends Seriali
 run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
   }
 
+  /**
+   * Aggregates points of duplicate feature values into a single point using 
as label the weighted
+   * average of the labels of the points with duplicate feature values. All 
points for a unique
+   * feature values are aggregated as:
+   *
+   *   - Aggregated label is the weighted average of all labels
+   *   - Aggregated feature is the weighted average of all equal features[1]
+   *   - Aggregated weight is the sum of all weights
+   *
+   * [1] Note: It is possible that feature values to be equal up to a 
resolution due to
+   * representation errors, since we cannot know which feature value to use in 
that case, we
+   * compute the weighted average of the features. Ideally, all feature values 
will be equal and
+   * the 

[spark] branch master updated: [SPARK-41408][BUILD] Upgrade scala-maven-plugin to 4.8.0

2022-12-08 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3a755d3613 [SPARK-41408][BUILD] Upgrade scala-maven-plugin to 4.8.0
a3a755d3613 is described below

commit a3a755d36136295473a4873a6df33c295c29213e
Author: yangjie01 
AuthorDate: Thu Dec 8 07:41:19 2022 -0600

[SPARK-41408][BUILD] Upgrade scala-maven-plugin to 4.8.0

### What changes were proposed in this pull request?
This pr aims upgrade scala-maven-plugin to 4.8.0

### Why are the changes needed?
This version upgrade zinc to 1.8.0 and inlcude some bug fix, the all change 
from 4.7.2 as follows:

- https://github.com/davidB/scala-maven-plugin/compare/4.7.2...4.8.0

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #38936 from LuciferYang/sm-plugin-480.

Authored-by: yangjie01 
Signed-off-by: Sean Owen 
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index b2e5979f467..a93954b3c5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,7 +175,7 @@
   errors building different Hadoop versions.
   See: SPARK-36547, SPARK-38394.
-->
-4.7.2
+4.8.0
 
 true
 true


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41435][SQL] Change to call `invalidFunctionArgumentsError` for `curdate()` when `expressions` is not empty

2022-12-08 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d5e32757429 [SPARK-41435][SQL] Change to call 
`invalidFunctionArgumentsError` for `curdate()`  when `expressions` is not empty
d5e32757429 is described below

commit d5e327574290e1da92d109081c500782d5a3bc21
Author: yangjie01 
AuthorDate: Thu Dec 8 15:40:18 2022 +0300

[SPARK-41435][SQL] Change to call `invalidFunctionArgumentsError` for 
`curdate()`  when `expressions` is not empty

### What changes were proposed in this pull request?
This pr change to call `invalidFunctionArgumentsError` instead of 
`invalidFunctionArgumentNumberError ` for `curdate()`  when `expressions` is 
not empty, then `curdate()` will throw `AnalysisException` with error class 
`WRONG_NUM_ARGS` when input args it not empty.

### Why are the changes needed?
`WRONG_NUM_ARGS` is a more appropriate error class

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test case

Closes #38960 from LuciferYang/curdate-err-msg.

Authored-by: yangjie01 
Signed-off-by: Max Gekk 
---
 .../catalyst/expressions/datetimeExpressions.scala |  4 ++--
 .../src/test/resources/sql-tests/inputs/date.sql   |  1 +
 .../resources/sql-tests/results/ansi/date.sql.out  | 23 ++
 .../test/resources/sql-tests/results/date.sql.out  | 23 ++
 .../sql-tests/results/datetime-legacy.sql.out  | 23 ++
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 13 
 6 files changed, 85 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index e8bad46e84a..3e89dfe39ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -171,8 +171,8 @@ object CurDateExpressionBuilder extends ExpressionBuilder {
 if (expressions.isEmpty) {
   CurrentDate()
 } else {
-  throw QueryCompilationErrors.invalidFunctionArgumentNumberError(
-Seq.empty, funcName, expressions.length)
+  throw QueryCompilationErrors.invalidFunctionArgumentsError(
+funcName, "0", expressions.length)
 }
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/date.sql 
b/sql/core/src/test/resources/sql-tests/inputs/date.sql
index ab57c7c754c..163855069f0 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/date.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/date.sql
@@ -19,6 +19,7 @@ select date'2021-4294967297-11';
 select current_date = current_date;
 -- under ANSI mode, `current_date` can't be a function name.
 select current_date() = current_date();
+select curdate(1);
 
 -- conversions between date and unix_date (number of days from epoch)
 select DATE_FROM_UNIX_DATE(0), DATE_FROM_UNIX_DATE(1000), 
DATE_FROM_UNIX_DATE(null);
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out
index 9ddbaec4f99..d0f5b02c916 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out
@@ -135,6 +135,29 @@ struct<(current_date() = current_date()):boolean>
 true
 
 
+-- !query
+select curdate(1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS",
+  "messageParameters" : {
+"actualNum" : "1",
+"expectedNum" : "0",
+"functionName" : "`curdate`"
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 17,
+"fragment" : "curdate(1)"
+  } ]
+}
+
+
 -- !query
 select DATE_FROM_UNIX_DATE(0), DATE_FROM_UNIX_DATE(1000), 
DATE_FROM_UNIX_DATE(null)
 -- !query schema
diff --git a/sql/core/src/test/resources/sql-tests/results/date.sql.out 
b/sql/core/src/test/resources/sql-tests/results/date.sql.out
index 9e427adb052..434e3c7abd3 100644
--- a/sql/core/src/test/resources/sql-tests/results/date.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/date.sql.out
@@ -121,6 +121,29 @@ struct<(current_date() = current_date()):boolean>
 true
 
 
+-- !query
+select curdate(1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS",
+  "messageParameters" : {
+"actualNum" : "1",
+"expectedNum" : "0",
+"functionName" : "`curdate`"
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+

[spark] branch master updated: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-08 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d53ba6276f4 [SPARK-41436][CONNECT][PYTHON] Implement `collection` 
functions: A~C
d53ba6276f4 is described below

commit d53ba6276f407b6e090c9610b85a56d047e56f73
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 8 16:59:13 2022 +0800

[SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

### What changes were proposed in this pull request?
 Implement [`collection` 
functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#collection-functions)
 alphabetically, this PR contains `A` ~ `C`  except:

- `aggregate`, `array_sort` - need the support of LambdaFunction Expression
- the `int count` in `array_repeat` - need to support datatype in 
LiteralExpression in the Python Client

### Why are the changes needed?

For API coverage

### Does this PR introduce _any_ user-facing change?
new APIs

### How was this patch tested?
added UT

Closes #38961 from zhengruifeng/connect_function_collect_1.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/functions.py| 608 -
 .../sql/tests/connect/test_connect_function.py | 156 ++
 2 files changed, 763 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index 8b36647ae5b..7eb17bd89ac 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -90,7 +90,10 @@ column = col
 
 
 def lit(col: Any) -> Column:
-return Column(LiteralExpression(col))
+if isinstance(col, Column):
+return col
+else:
+return Column(LiteralExpression(col))
 
 
 # def bitwiseNOT(col: "ColumnOrName") -> Column:
@@ -3208,6 +3211,609 @@ def variance(col: "ColumnOrName") -> Column:
 return var_samp(col)
 
 
+# Collection Functions
+
+
+# TODO(SPARK-41434): need to support LambdaFunction Expression first
+# def aggregate(
+# col: "ColumnOrName",
+# initialValue: "ColumnOrName",
+# merge: Callable[[Column, Column], Column],
+# finish: Optional[Callable[[Column], Column]] = None,
+# ) -> Column:
+# """
+# Applies a binary operator to an initial state and all elements in the 
array,
+# and reduces this to a single state. The final state is converted into 
the final result
+# by applying a finish function.
+#
+# Both functions can use methods of :class:`~pyspark.sql.Column`, 
functions defined in
+# :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``.
+# Python ``UserDefinedFunctions`` are not supported
+# (`SPARK-27052 `__).
+#
+# .. versionadded:: 3.1.0
+#
+# Parameters
+# --
+# col : :class:`~pyspark.sql.Column` or str
+# name of column or expression
+# initialValue : :class:`~pyspark.sql.Column` or str
+# initial value. Name of column or expression
+# merge : function
+# a binary function ``(acc: Column, x: Column) -> Column...`` 
returning expression
+# of the same type as ``zero``
+# finish : function
+# an optional unary function ``(x: Column) -> Column: ...``
+# used to convert accumulated value.
+#
+# Returns
+# ---
+# :class:`~pyspark.sql.Column`
+# final value after aggregate function is applied.
+#
+# Examples
+# 
+# >>> df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], 
("id", "values"))
+# >>> df.select(aggregate("values", lit(0.0), lambda acc, x: acc + 
x).alias("sum")).show()
+# ++
+# | sum|
+# ++
+# |42.0|
+# ++
+#
+# >>> def merge(acc, x):
+# ... count = acc.count + 1
+# ... sum = acc.sum + x
+# ... return struct(count.alias("count"), sum.alias("sum"))
+# >>> df.select(
+# ... aggregate(
+# ... "values",
+# ... struct(lit(0).alias("count"), lit(0.0).alias("sum")),
+# ... merge,
+# ... lambda acc: acc.sum / acc.count,
+# ... ).alias("mean")
+# ... ).show()
+# ++
+# |mean|
+# ++
+# | 8.4|
+# ++
+# """
+# if finish is not None:
+# return _invoke_higher_order_function("ArrayAggregate", [col, 
initialValue],
+#   [merge, finish])
+#
+# else:
+# return _invoke_higher_order_function("ArrayAggregate", [col, 
initialValue],
+#   [merge])
+
+
+def array(*cols: Union["ColumnOrName", List["ColumnOrName"], 
Tuple["ColumnOrName", ...]]) -> Column:
+"""Creates a new array column.
+
+.. versionadded:: 3.4.0
+
+  

[spark] branch master updated (786ccf9a10c -> 5c8949ca4a6)

2022-12-08 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 786ccf9a10c [SPARK-41445][CONNECT] Implement DataFrameReader.parquet
 add 5c8949ca4a6 [SPARK-41412][CONNECT] Implement `Column.cast`

No new revisions were added by this update.

Summary of changes:
 .../main/protobuf/spark/connect/expressions.proto  |   6 +-
 .../org/apache/spark/sql/connect/dsl/package.scala |  12 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  13 +-
 .../connect/planner/SparkConnectProtoSuite.scala   |   4 +
 python/pyspark/sql/connect/client.py   |  87 +
 python/pyspark/sql/connect/column.py   |  48 ++-
 .../pyspark/sql/connect/proto/expressions_pb2.py   |  56 
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  36 +-
 python/pyspark/sql/connect/types.py| 143 +
 .../sql/tests/connect/test_connect_column.py   |  39 ++
 10 files changed, 319 insertions(+), 125 deletions(-)
 create mode 100644 python/pyspark/sql/connect/types.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41445][CONNECT] Implement DataFrameReader.parquet

2022-12-08 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 786ccf9a10c [SPARK-41445][CONNECT] Implement DataFrameReader.parquet
786ccf9a10c is described below

commit 786ccf9a10c92f41bfb7b0eb022e3126918730ea
Author: Hyukjin Kwon 
AuthorDate: Thu Dec 8 00:41:23 2022 -0800

[SPARK-41445][CONNECT] Implement DataFrameReader.parquet

### What changes were proposed in this pull request?

This PR implements `DataFrameReader.parquet` alias in Spark Connect.

### Why are the changes needed?

For API feature parity.

### Does this PR introduce _any_ user-facing change?

This PR adds a user-facing API but Spark Connect has not been released yet.

### How was this patch tested?

Unittest was added.

Closes #38977 from HyukjinKwon/parquet-read.

Authored-by: Hyukjin Kwon 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/connect/readwriter.py   | 57 ++
 .../sql/tests/connect/test_connect_basic.py| 12 +
 2 files changed, 69 insertions(+)

diff --git a/python/pyspark/sql/connect/readwriter.py 
b/python/pyspark/sql/connect/readwriter.py
index 64ee3973f5f..470417b6a28 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -273,6 +273,63 @@ class DataFrameReader(OptionUtils):
 )
 return self.load(path=path, format="json", schema=schema)
 
+def parquet(self, path: str, **options: "OptionalPrimitiveType") -> 
"DataFrame":
+"""
+Loads Parquet files, returning the result as a :class:`DataFrame`.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+path : str
+
+Other Parameters
+
+**options
+For the extra options, refer to
+`Data Source Option 
`_
+for the version you use.
+
+.. # noqa
+
+Examples
+
+Write a DataFrame into a Parquet file and read it back.
+
+>>> import tempfile
+>>> with tempfile.TemporaryDirectory() as d:
+... # Write a DataFrame into a Parquet file
+... spark.createDataFrame(
+... [{"age": 100, "name": "Hyukjin Kwon"}]
+... ).write.mode("overwrite").format("parquet").save(d)
+...
+... # Read the Parquet file as a DataFrame.
+... spark.read.parquet(d).show()
++---++
+|age|name|
++---++
+|100|Hyukjin Kwon|
++---++
+"""
+mergeSchema = options.get("mergeSchema", None)
+pathGlobFilter = options.get("pathGlobFilter", None)
+modifiedBefore = options.get("modifiedBefore", None)
+modifiedAfter = options.get("modifiedAfter", None)
+recursiveFileLookup = options.get("recursiveFileLookup", None)
+datetimeRebaseMode = options.get("datetimeRebaseMode", None)
+int96RebaseMode = options.get("int96RebaseMode", None)
+self._set_opts(
+mergeSchema=mergeSchema,
+pathGlobFilter=pathGlobFilter,
+recursiveFileLookup=recursiveFileLookup,
+modifiedBefore=modifiedBefore,
+modifiedAfter=modifiedAfter,
+datetimeRebaseMode=datetimeRebaseMode,
+int96RebaseMode=int96RebaseMode,
+)
+
+return self.load(path=path, format="parquet")
+
 
 class DataFrameWriter(OptionUtils):
 """
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 3681a9980b9..ae3813b43ae 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -138,6 +138,18 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 self.spark.read.json(path=d, 
primitivesAsString=True).toPandas(),
 )
 
+def test_paruqet(self):
+# SPARK-41445: Implement DataFrameReader.paruqet
+with tempfile.TemporaryDirectory() as d:
+# Write a DataFrame into a JSON file
+self.spark.createDataFrame([{"age": 100, "name": "Hyukjin 
Kwon"}]).write.mode(
+"overwrite"
+).format("parquet").save(d)
+# Read the Parquet file as a DataFrame.
+self.assert_eq(
+self.connect.read.parquet(d).toPandas(), 
self.spark.read.parquet(d).toPandas()
+)
+
 def test_join_condition_column_list_columns(self):
 left_connect_df = self.connect.read.table(self.tbl_name)
 right_connect_df = self.connect.read.table(self.tbl_name2)



[spark] branch branch-3.3 updated: [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset

2022-12-08 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 02f32ee358c [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
02f32ee358c is described below

commit 02f32ee358cc0a398aa7321bc5613cb92b306f6f
Author: wecharyu 
AuthorDate: Thu Dec 8 17:12:30 2022 +0900

[SPARK-41375][SS] Avoid empty latest KafkaSourceOffset

### What changes were proposed in this pull request?

Add the empty offset filter in `latestOffset()` for Kafka Source, so that 
offset remains unchanged if Kafka provides no topic partition during fetch.

### Why are the changes needed?

KafkaOffsetReader may fetch empty partitions in some extreme cases like 
getting partitions while Kafka cluster is reassigning partitions, this will 
produce an empty `PartitionOffsetMap` (although there are topic-partitions 
being unchanged) and stored in `committedOffsets` after `runBatch()`.

Then in the next batch, we fetch partitions normally and get the actual 
offsets, but when fetching data of this batch in 
`KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in 
endOffsets will be considered as new partitions since the startOffsets is 
empty, then these "new partitions" will fetch earliest offsets, which will 
cause the data duplication.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add an unit test.

Closes #38898 from wecharyu/SPARK-41375.

Authored-by: wecharyu 
Signed-off-by: Jungtaek Lim 
(cherry picked from commit 043475a87844f11c252fb0ebab469148ae6985d7)
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  7 ++--
 .../apache/spark/sql/kafka010/KafkaSource.scala|  4 +--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 39 ++
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 77bc658a1ef..a371d25899d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -85,8 +85,6 @@ private[kafka010] class KafkaMicroBatchStream(
 
   private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
 
-  private var endPartitionOffsets: KafkaSourceOffset = _
-
   private var latestPartitionOffsets: PartitionOffsetMap = _
 
   private var allDataForTriggerAvailableNow: PartitionOffsetMap = _
@@ -114,7 +112,7 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   override def reportLatestOffset(): Offset = {
-KafkaSourceOffset(latestPartitionOffsets)
+
Option(KafkaSourceOffset(latestPartitionOffsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   override def latestOffset(): Offset = {
@@ -163,8 +161,7 @@ private[kafka010] class KafkaMicroBatchStream(
   }.getOrElse(latestPartitionOffsets)
 }
 
-endPartitionOffsets = KafkaSourceOffset(offsets)
-endPartitionOffsets
+
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index c82fda85eb4..b84643533f8 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -177,7 +177,7 @@ private[kafka010] class KafkaSource(
   kafkaReader.fetchLatestOffsets(currentOffsets)
 }
 
-latestPartitionOffsets = Some(latest)
+latestPartitionOffsets = if (latest.isEmpty) None else Some(latest)
 
 val limits: Seq[ReadLimit] = limit match {
   case rows: CompositeReadLimit => rows.getReadLimits
@@ -213,7 +213,7 @@ private[kafka010] class KafkaSource(
 }
 currentPartitionOffsets = Some(offsets)
 logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
-KafkaSourceOffset(offsets)
+
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

[spark] branch master updated: [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset

2022-12-08 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 043475a8784 [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
043475a8784 is described below

commit 043475a87844f11c252fb0ebab469148ae6985d7
Author: wecharyu 
AuthorDate: Thu Dec 8 17:12:30 2022 +0900

[SPARK-41375][SS] Avoid empty latest KafkaSourceOffset

### What changes were proposed in this pull request?

Add the empty offset filter in `latestOffset()` for Kafka Source, so that 
offset remains unchanged if Kafka provides no topic partition during fetch.

### Why are the changes needed?

KafkaOffsetReader may fetch empty partitions in some extreme cases like 
getting partitions while Kafka cluster is reassigning partitions, this will 
produce an empty `PartitionOffsetMap` (although there are topic-partitions 
being unchanged) and stored in `committedOffsets` after `runBatch()`.

Then in the next batch, we fetch partitions normally and get the actual 
offsets, but when fetching data of this batch in 
`KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in 
endOffsets will be considered as new partitions since the startOffsets is 
empty, then these "new partitions" will fetch earliest offsets, which will 
cause the data duplication.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add an unit test.

Closes #38898 from wecharyu/SPARK-41375.

Authored-by: wecharyu 
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  7 ++--
 .../apache/spark/sql/kafka010/KafkaSource.scala|  4 +--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 39 ++
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 77bc658a1ef..a371d25899d 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -85,8 +85,6 @@ private[kafka010] class KafkaMicroBatchStream(
 
   private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
 
-  private var endPartitionOffsets: KafkaSourceOffset = _
-
   private var latestPartitionOffsets: PartitionOffsetMap = _
 
   private var allDataForTriggerAvailableNow: PartitionOffsetMap = _
@@ -114,7 +112,7 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   override def reportLatestOffset(): Offset = {
-KafkaSourceOffset(latestPartitionOffsets)
+
Option(KafkaSourceOffset(latestPartitionOffsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   override def latestOffset(): Offset = {
@@ -163,8 +161,7 @@ private[kafka010] class KafkaMicroBatchStream(
   }.getOrElse(latestPartitionOffsets)
 }
 
-endPartitionOffsets = KafkaSourceOffset(offsets)
-endPartitionOffsets
+
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index a7840ef1055..a9ee5b64625 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -177,7 +177,7 @@ private[kafka010] class KafkaSource(
   kafkaReader.fetchLatestOffsets(currentOffsets)
 }
 
-latestPartitionOffsets = Some(latest)
+latestPartitionOffsets = if (latest.isEmpty) None else Some(latest)
 
 val limits: Seq[ReadLimit] = limit match {
   case rows: CompositeReadLimit => rows.getReadLimits
@@ -213,7 +213,7 @@ private[kafka010] class KafkaSource(
 }
 currentPartitionOffsets = Some(offsets)
 logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
-KafkaSourceOffset(offsets)
+
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index af66ecd21c0..bf0e72cd32b 100644
---