(spark) branch master updated: [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations

2023-12-04 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d80d80a56c4 [SPARK-46249][SS] Require instance lock for acquiring 
RocksDB metrics to prevent race with background operations
1d80d80a56c4 is described below

commit 1d80d80a56c418f841e282ad753fad6671c3baae
Author: Anish Shrigondekar 
AuthorDate: Tue Dec 5 15:00:08 2023 +0900

[SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to 
prevent race with background operations

### What changes were proposed in this pull request?
Require instance lock for acquiring RocksDB metrics to prevent race with 
background operations

### Why are the changes needed?
The changes are needed to avoid races where the statefulOperator tries to 
set storeMetrics after the commit and the DB instance has already been 
closed/aborted/reloaded.
We have seen a few query failures with the following stack trace due to 
this reason:
```
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted 
due to stage failure: Task 3 in stage 531.0 failed 1 times, most recent 
failure: Lost task 3.0 in stage 531.0 (TID 1544) 
(ip-10-110-29-251.us-west-2.compute.internal executor driver): 
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.streaming.state.RocksDB.getDBProperty(RocksDB.scala:838)
at 
org.apache.spark.sql.execution.streaming.state.RocksDB.metrics(RocksDB.scala:678)
at 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.metrics(RocksDBStateStoreProvider.scala:137)
at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics(statefulOperators.scala:198)
at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics$(statefulOperators.scala:197)
at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.setStoreMetrics(statefulOperators.scala:495)
at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.close(statefulOperators.scala:626)
at 
org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithKeys_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:498)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743)
at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552)
at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482)
at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557)
at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445)
at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at 
com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Modified existing unit tests

```
[info] Run completed in 1 minute, 31 seconds.
[info] Total number of tests run: 150
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 150, failed 0, canceled 0, ignored 0, pending 0
[info] All 

(spark) branch master updated: [SPARK-46255][PYTHON][CONNECT] Support complex type -> string conversion

2023-12-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3c8c7ad92cdc [SPARK-46255][PYTHON][CONNECT] Support complex type -> 
string conversion
3c8c7ad92cdc is described below

commit 3c8c7ad92cdc3ae989456d87b0332cb917da7e4e
Author: Ruifeng Zheng 
AuthorDate: Tue Dec 5 13:05:36 2023 +0800

[SPARK-46255][PYTHON][CONNECT] Support complex type -> string conversion

### What changes were proposed in this pull request?
Support complex type -> string conversion

### Why are the changes needed?
to support `list -> str` conversion

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44171 from zhengruifeng/py_connect_str_conv.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/conversion.py | 16 
 python/pyspark/sql/tests/test_types.py   | 13 +
 2 files changed, 13 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/sql/connect/conversion.py 
b/python/pyspark/sql/connect/conversion.py
index 550978d02f85..fb5a2d4b17b1 100644
--- a/python/pyspark/sql/connect/conversion.py
+++ b/python/pyspark/sql/connect/conversion.py
@@ -222,22 +222,6 @@ class LocalDataToArrowConversion:
 if value is None:
 return None
 else:
-# only atomic types are supported
-assert isinstance(
-value,
-(
-bool,
-int,
-float,
-str,
-bytes,
-bytearray,
-decimal.Decimal,
-datetime.date,
-datetime.datetime,
-datetime.timedelta,
-),
-)
 if isinstance(value, bool):
 # To match the PySpark which convert bool to string in
 # the JVM side (python.EvaluatePython.makeFromJava)
diff --git a/python/pyspark/sql/tests/test_types.py 
b/python/pyspark/sql/tests/test_types.py
index b038cf6ce5ba..a07309d1dff9 100644
--- a/python/pyspark/sql/tests/test_types.py
+++ b/python/pyspark/sql/tests/test_types.py
@@ -507,6 +507,19 @@ class TypesTestsMixin:
 self.assertEqual(1, row.asDict()["l"][0].a)
 self.assertEqual(1.0, row.asDict()["d"]["key"].c)
 
+def test_convert_list_to_str(self):
+data = [[[123], 120]]
+schema = StructType(
+[
+StructField("name", StringType(), True),
+StructField("income", LongType(), True),
+]
+)
+df = self.spark.createDataFrame(data, schema)
+self.assertEqual(df.schema, schema)
+self.assertEqual(df.count(), 1)
+self.assertEqual(df.head(), Row(name="[123]", income=120))
+
 def test_udt(self):
 from pyspark.sql.types import _parse_datatype_json_string, 
_infer_type, _make_type_verifier
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46254][PYTHON] Remove stale Python 3.8/3.7 version checking

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f3e113b687be [SPARK-46254][PYTHON] Remove stale Python 3.8/3.7 version 
checking
f3e113b687be is described below

commit f3e113b687be328ba0d318ec03977aba3036b86f
Author: Hyukjin Kwon 
AuthorDate: Tue Dec 5 11:44:37 2023 +0900

[SPARK-46254][PYTHON] Remove stale Python 3.8/3.7 version checking

### What changes were proposed in this pull request?

This PR proposes to remove stale Python 3.8/3.7 version checking in the 
codebase.

### Why are the changes needed?

To remove unnecessary version comparison

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested the version comparison.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44169 from HyukjinKwon/remove-python38.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/__init__.py   | 21 ++---
 python/pyspark/pandas/frame.py  |  4 +---
 .../pandas/tests/computation/test_apply_func.py |  5 +++--
 python/pyspark/pandas/tests/test_typedef.py |  6 --
 python/pyspark/pandas/typedef/typehints.py  |  7 +--
 5 files changed, 23 insertions(+), 20 deletions(-)

diff --git a/python/pyspark/pandas/__init__.py 
b/python/pyspark/pandas/__init__.py
index 44365c8e6b88..65366f544092 100644
--- a/python/pyspark/pandas/__init__.py
+++ b/python/pyspark/pandas/__init__.py
@@ -122,17 +122,16 @@ def _auto_patch_pandas() -> None:
 _frame_has_class_getitem = hasattr(pd.DataFrame, "__class_getitem__")
 _series_has_class_getitem = hasattr(pd.Series, "__class_getitem__")
 
-if sys.version_info >= (3, 7):
-# Just in case pandas implements '__class_getitem__' later.
-if not _frame_has_class_getitem:
-pd.DataFrame.__class_getitem__ = (  # type: ignore[attr-defined]
-lambda params: DataFrame.__class_getitem__(params)
-)
-
-if not _series_has_class_getitem:
-pd.Series.__class_getitem__ = (  # type: ignore[attr-defined]
-lambda params: Series.__class_getitem__(params)
-)
+# Just in case pandas implements '__class_getitem__' later.
+if not _frame_has_class_getitem:
+pd.DataFrame.__class_getitem__ = (  # type: ignore[attr-defined]
+lambda params: DataFrame.__class_getitem__(params)
+)
+
+if not _series_has_class_getitem:
+pd.Series.__class_getitem__ = (  # type: ignore[attr-defined]
+lambda params: Series.__class_getitem__(params)
+)
 
 
 _auto_patch_spark()
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index a54316dffeb4..9846dc0ae10b 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -2103,9 +2103,7 @@ class DataFrame(Frame, Generic[T]):
 v = [row[c] for c in data_spark_column_names]
 return k, v
 
-can_return_named_tuples = sys.version_info >= (3, 7) or 
len(self.columns) + index < 255
-
-if name is not None and can_return_named_tuples:
+if name is not None:
 itertuple = namedtuple(name, fields, rename=True)  # type: 
ignore[misc]
 for k, v in map(
 extract_kv_from_spark_row,
diff --git a/python/pyspark/pandas/tests/computation/test_apply_func.py 
b/python/pyspark/pandas/tests/computation/test_apply_func.py
index 93d9d56a479a..00b14441991a 100644
--- a/python/pyspark/pandas/tests/computation/test_apply_func.py
+++ b/python/pyspark/pandas/tests/computation/test_apply_func.py
@@ -23,6 +23,7 @@ import numpy as np
 import pandas as pd
 
 from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
 from pyspark.pandas.config import option_context
 from pyspark.testing.pandasutils import ComparisonTestBase
 from pyspark.testing.sqlutils import SQLTestUtils
@@ -252,8 +253,8 @@ class FrameApplyFunctionMixin:
 actual.columns = ["a", "b"]
 self.assert_eq(actual, pdf)
 
-# For NumPy typing, NumPy version should be 1.21+ and Python version 
should be 3.8+
-if sys.version_info >= (3, 8):
+# For NumPy typing, NumPy version should be 1.21+
+if LooseVersion(np.__version__) >= LooseVersion("1.21"):
 import numpy.typing as ntp
 
 psdf = ps.from_pandas(pdf)
diff --git a/python/pyspark/pandas/tests/test_typedef.py 
b/python/pyspark/pandas/tests/test_typedef.py
index 52913fb65f09..e8095ce4ba06 100644
--- a/python/pyspark/pandas/tests/test_typedef.py
+++ b/python/pyspark/pandas/tests/test_typedef.py
@@ -25,6 +25,8 @@ import pandas
 import 

(spark) branch master updated: [SPARK-46043][SQL] Support create table using DSv2 sources

2023-12-04 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5fec76dc8db2 [SPARK-46043][SQL] Support create table using DSv2 sources
5fec76dc8db2 is described below

commit 5fec76dc8db2499b0a9d76231f9a250871d59658
Author: allisonwang-db 
AuthorDate: Tue Dec 5 09:35:14 2023 +0800

[SPARK-46043][SQL] Support create table using DSv2 sources

### What changes were proposed in this pull request?

This PR supports `CREATE TABLE ... USING source` for DSv2 sources.

### Why are the changes needed?

To support creating DSv2 tables in SQL. Currently the table create can work 
but when you select a dsv2 table created in SQL, it fails with this error:
```
org.apache.spark.sql.AnalysisException: 
org.apache.spark.sql.connector.SimpleDataSourceV2 is not a valid Spark SQL Data 
Source.
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43949 from allisonwang-db/spark-46043-dsv2-create-table.

Authored-by: allisonwang-db 
Signed-off-by: Wenchen Fan 
---
 .../src/main/resources/error/error-classes.json|  13 +++
 ...-cannot-create-data-source-table-error-class.md |  32 ++
 docs/sql-error-conditions.md   |   8 ++
 .../spark/sql/catalyst/util/QuotingUtils.scala |   8 ++
 .../sql/connector/catalog/CatalogV2Implicits.scala |   6 ++
 .../datasources/v2/DataSourceV2Relation.scala  |  19 +++-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  14 +--
 .../datasources/v2/DataSourceV2Utils.scala |  16 +++
 .../datasources/v2/V2SessionCatalog.scala  | 104 +---
 .../connector/JavaSchemaRequiredDataSource.java|   4 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  15 +--
 .../spark/sql/connector/DataSourceV2Suite.scala| 108 -
 .../spark/sql/connector/FakeV2Provider.scala   |  63 
 .../spark/sql/connector/InsertIntoTests.scala  |  12 ++-
 .../sql/connector/TestV2SessionCatalogBase.scala   |   5 +-
 .../spark/sql/connector/V1WriteFallbackSuite.scala |   3 +-
 .../execution/command/PlanResolutionSuite.scala|   2 +-
 17 files changed, 384 insertions(+), 48 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index a808be9510cf..e54d346e1bc1 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -149,6 +149,19 @@
 ],
 "sqlState" : "42846"
   },
+  "CANNOT_CREATE_DATA_SOURCE_TABLE" : {
+"message" : [
+  "Failed to create data source table :"
+],
+"subClass" : {
+  "EXTERNAL_METADATA_UNSUPPORTED" : {
+"message" : [
+  "provider '' does not support external metadata but a 
schema is provided. Please remove the schema when creating the table."
+]
+  }
+},
+"sqlState" : "42KDE"
+  },
   "CANNOT_DECODE_URL" : {
 "message" : [
   "The provided URL cannot be decoded: . Please ensure that the URL 
is properly formatted and try again."
diff --git 
a/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md 
b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
new file mode 100644
index ..f52e4f462bc9
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
@@ -0,0 +1,32 @@
+---
+layout: global
+title: CANNOT_CREATE_DATA_SOURCE_TABLE error class
+displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+[SQLSTATE: 
42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to create data source table ``:
+
+This error class has the following derived error classes:
+
+## EXTERNAL_METADATA_UNSUPPORTED
+
+provider '``' 

(spark) branch master updated: Revert "[SPARK-46213][PYTHON] Introduce `PySparkImportError` for error framework"

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7f59565b9fc1 Revert "[SPARK-46213][PYTHON] Introduce 
`PySparkImportError` for error framework"
7f59565b9fc1 is described below

commit 7f59565b9fc19c496bc7600e168650e7663c0065
Author: Hyukjin Kwon 
AuthorDate: Tue Dec 5 10:29:25 2023 +0900

Revert "[SPARK-46213][PYTHON] Introduce `PySparkImportError` for error 
framework"

This reverts commit 75b0eb2d601763847507a5e715b3732db004544a.
---
 python/docs/source/reference/pyspark.errors.rst |  1 -
 python/pyspark/errors/__init__.py   |  2 --
 python/pyspark/errors/error_classes.py  | 10 ---
 python/pyspark/errors/exceptions/base.py|  6 
 python/pyspark/sql/connect/utils.py | 36 +++
 python/pyspark/sql/pandas/utils.py  | 38 +++--
 6 files changed, 21 insertions(+), 72 deletions(-)

diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index a4997506b41e..56fdde2584c5 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -44,7 +44,6 @@ Classes
 PySparkRuntimeError
 PySparkTypeError
 PySparkValueError
-PySparkImportError
 PySparkIndexError
 PythonException
 QueryExecutionException
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index 07033d216432..0a55084a4a59 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -39,7 +39,6 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
 SparkNoSuchElementException,
 PySparkTypeError,
 PySparkValueError,
-PySparkImportError,
 PySparkIndexError,
 PySparkAttributeError,
 PySparkRuntimeError,
@@ -71,7 +70,6 @@ __all__ = [
 "SparkNoSuchElementException",
 "PySparkTypeError",
 "PySparkValueError",
-"PySparkImportError",
 "PySparkIndexError",
 "PySparkAttributeError",
 "PySparkRuntimeError",
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 8ca73ca85de6..e0ca8a938ec2 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -682,11 +682,6 @@ ERROR_CLASSES_JSON = """
   "Only a single trigger is allowed."
 ]
   },
-  "PACKAGE_NOT_INSTALLED" : {
-"message" : [
-  " >=  must be installed; however, it was 
not found."
-]
-  },
   "PIPE_FUNCTION_EXITED" : {
 "message" : [
   "Pipe function `` exited with error code ."
@@ -943,11 +938,6 @@ ERROR_CLASSES_JSON = """
   " is not supported."
 ]
   },
-  "UNSUPPORTED_PACKAGE_VERSION" : {
-"message" : [
-  " >=  must be installed; however, your 
version is ."
-]
-  },
   "UNSUPPORTED_PARAM_TYPE_FOR_HIGHER_ORDER_FUNCTION" : {
 "message" : [
   "Function `` should use only POSITIONAL or POSITIONAL OR 
KEYWORD arguments."
diff --git a/python/pyspark/errors/exceptions/base.py 
b/python/pyspark/errors/exceptions/base.py
index 0f4001483b7f..e7f1e4386d7a 100644
--- a/python/pyspark/errors/exceptions/base.py
+++ b/python/pyspark/errors/exceptions/base.py
@@ -260,12 +260,6 @@ class PySparkPicklingError(PySparkException, 
PicklingError):
 """
 
 
-class PySparkImportError(PySparkException, ImportError):
-"""
-Wrapper class for ImportError to support error classes.
-"""
-
-
 class PySparkKeyError(PySparkException, KeyError):
 """
 Wrapper class for KeyError to support error classes.
diff --git a/python/pyspark/sql/connect/utils.py 
b/python/pyspark/sql/connect/utils.py
index 88f26202b0b2..fd85d75060b5 100644
--- a/python/pyspark/sql/connect/utils.py
+++ b/python/pyspark/sql/connect/utils.py
@@ -18,7 +18,6 @@ import sys
 
 from pyspark.loose_version import LooseVersion
 from pyspark.sql.pandas.utils import require_minimum_pandas_version, 
require_minimum_pyarrow_version
-from pyspark.errors import PySparkImportError
 
 
 def check_dependencies(mod_name: str) -> None:
@@ -46,21 +45,13 @@ def require_minimum_grpc_version() -> None:
 try:
 import grpc
 except ImportError as error:
-raise PySparkImportError(
-error_class="PACKAGE_NOT_INSTALLED",
-message_parameters={
-"package_name:": "grpcio",
-"minimum_version": str(minimum_grpc_version),
-},
+raise ImportError(
+f"grpcio >= {minimum_grpc_version} must be installed; however, it 
was not found."
 ) from error
 if LooseVersion(grpc.__version__) < LooseVersion(minimum_grpc_version):
-raise PySparkImportError(
-error_class="UNSUPPORTED_PACKAGE_VERSION",
-message_parameters={
-

(spark) branch master updated: [SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and PercentileDisc into functionCall

2023-12-04 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f1283c126878 [SPARK-46009][SQL][CONNECT] Merge the parse rule of 
PercentileCont and PercentileDisc into functionCall
f1283c126878 is described below

commit f1283c12687853f9cd190f8db69d97abe16a2d88
Author: Jiaan Geng 
AuthorDate: Tue Dec 5 09:28:30 2023 +0800

[SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and 
PercentileDisc into functionCall

### What changes were proposed in this pull request?
Spark SQL parser have a special rule to parse 
`[percentile_cont|percentile_disc](percentage) WITHIN GROUP (ORDER BY v)`.
We should merge this rule into the `functionCall`.

### Why are the changes needed?
Merge the parse rule of `PercentileCont` and `PercentileDisc` into 
`functionCall`.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43910 from beliefer/SPARK-46009.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 common/utils/src/main/resources/error/README.md|   1 +
 .../src/main/resources/error/error-classes.json|  23 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   7 +-
 ...id-inverse-distribution-function-error-class.md |  40 +++
 docs/sql-error-conditions.md   |   8 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |   4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala |  29 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   2 +
 .../spark/sql/catalyst/analysis/unresolved.scala   |  32 ++-
 .../aggregate/SupportsOrderingWithinGroup.scala|  27 ++
 .../expressions/aggregate/percentiles.scala|  81 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala |  34 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  24 ++
 .../sql/catalyst/parser/PlanParserSuite.scala  |  49 +++-
 .../sql-functions/sql-expression-schema.md |   2 +
 .../sql-tests/analyzer-results/percentiles.sql.out | 275 +++
 .../resources/sql-tests/inputs/percentiles.sql |  48 
 .../sql-tests/results/percentiles.sql.out  | 299 +
 18 files changed, 922 insertions(+), 63 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index c9fdd84e7442..556a634e9927 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1309,6 +1309,7 @@ The following SQLSTATEs are collated from:
 |HZ320|HZ   |RDA-specific condition|320 
|version not supported   |RDA/SQL|Y 
  |RDA/SQL  
   |
 |HZ321|HZ   |RDA-specific condition|321 
|TCP/IP error|RDA/SQL|Y 
  |RDA/SQL  
   |
 |HZ322|HZ   |RDA-specific condition|322 
|TLS alert   |RDA/SQL|Y 
  |RDA/SQL  
   |
+|ID001|IM   |Invalid inverse distribution function |001 
|Invalid inverse distribution function   |SQL/Foundation |N 
  |SQL/Foundation PostgreSQL Oracle Snowflake Redshift H2   
   |
 |IM001|IM   |ODBC driver   |001 
|Driver does not support this function   |SQL Server |N 
  |SQL Server   
   |
 |IM002|IM   |ODBC driver   |002 
|Data source name not found and no default driver specified  |SQL Server |N 
  |SQL Server   
   |
 |IM003|IM   |ODBC driver   |003 
|Specified driver could not be loaded|SQL Server |N 
  |SQL Server   
   |
diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 6795ebcb0bd0..a808be9510cf 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1858,6 +1858,29 @@
 },
 "sqlState" : "42000"
   },
+  "INVALID_INVERSE_DISTRIBUTION_FUNCTION" : {
+"message" : [

(spark) branch master updated: [SPARK-46229][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary Python version check lower than 3.8

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7af6a666859d [SPARK-46229][PYTHON][CONNECT][FOLLOW-UP] Remove 
unnecessary Python version check lower than 3.8
7af6a666859d is described below

commit 7af6a666859d2f614e9937375ea518f254bbad06
Author: Hyukjin Kwon 
AuthorDate: Tue Dec 5 09:54:37 2023 +0900

[SPARK-46229][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary Python version 
check lower than 3.8

### What changes were proposed in this pull request?

This PR addresses the review comment 
https://github.com/apache/spark/pull/44146#discussion_r1414336068.

### Why are the changes needed?

We don't support Python lower than 3.8 so we can remove that if-else on 
Python version.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests via CI.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44168 from HyukjinKwon/SPARK-46229-followup.

Lead-authored-by: Hyukjin Kwon 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/_typing.py | 10 +-
 1 file changed, 1 insertion(+), 9 deletions(-)

diff --git a/python/pyspark/sql/connect/_typing.py 
b/python/pyspark/sql/connect/_typing.py
index 392c62bf50d3..1b8516427dbd 100644
--- a/python/pyspark/sql/connect/_typing.py
+++ b/python/pyspark/sql/connect/_typing.py
@@ -14,16 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import sys
-
-if sys.version_info >= (3, 8):
-from typing import Protocol, Tuple
-else:
-from typing_extensions import Protocol
-
-from typing import Tuple
 from types import FunctionType
-from typing import Any, Callable, Iterable, Union, Optional, NewType
+from typing import Any, Callable, Iterable, Union, Optional, NewType, 
Protocol, Tuple
 import datetime
 import decimal
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions

2023-12-04 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 91a02e5d9701 [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' 
partitioning/ordering columns to support general expressions
91a02e5d9701 is described below

commit 91a02e5d97011f5f9b620c07b1c2f7d85291448b
Author: Daniel Tenedorio 
AuthorDate: Mon Dec 4 16:51:56 2023 -0800

[SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' 
partitioning/ordering columns to support general expressions

### What changes were proposed in this pull request?

This PR updates the Python user-defined table function (UDTF) API for the 
`analyze` method to support general expressions for the `partitionBy` and 
`orderBy` fields of the `AnalyzeResult` class.

For example, the following UDTF specifies to partition by `partition_col / 
10` so that all rows with values of this column between 0-9 arrive in the same 
partition, then all rows with values between 10-19 in the next partition, and 
so on.

```
udtf
class TestUDTF:
def __init__(self):
self._partition_col = None
self._count = 0
self._sum = 0
self._last = None

staticmethod
def analyze(*args, **kwargs):
return AnalyzeResult(
schema=StructType()
.add("partition_col", IntegerType())
.add("count", IntegerType())
.add("total", IntegerType())
.add("last", IntegerType()),
partitionBy=[PartitioningColumn("partition_col / 10")],
orderBy=[
OrderingColumn(name="input", ascending=True, 
overrideNullsFirst=False)
],
)

def eval(self, row: Row):
self._partition_col = row["partition_col"]
self._count += 1
self._last = row["input"]
if row["input"] is not None:
self._sum += row["input"]

def terminate(self):
yield self._partition_col, self._count, self._sum, self._last
```

### Why are the changes needed?

This lets the UDTF partition by simple references to the columns of the 
input table just like before, but also gives the option to partition by general 
expressions based on those columns (just like the explicit `PARTITION BY` and 
`ORDER BY` clauses in the UDTF call in SQL).

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR includes test coverage.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43946 from dtenedor/udtf-order-partition-by-exprs.

Authored-by: Daniel Tenedorio 
Signed-off-by: Takuya UESHIN 
---
 .../src/main/resources/error/error-classes.json|  6 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  4 +-
 docs/sql-error-conditions.md   |  6 +++
 python/docs/source/user_guide/sql/python_udtf.rst  |  2 +-
 python/pyspark/sql/udtf.py | 21 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  7 +++
 .../org/apache/spark/sql/UDTFRegistration.scala|  4 +-
 .../python/UserDefinedPythonFunction.scala | 53 -
 .../sql-tests/analyzer-results/udtf/udtf.sql.out   | 47 +++
 .../test/resources/sql-tests/inputs/udtf/udtf.sql  |  3 ++
 .../resources/sql-tests/results/udtf/udtf.sql.out  | 54 ++
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  | 39 +---
 12 files changed, 206 insertions(+), 40 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 9e0019b34728..6795ebcb0bd0 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3110,6 +3110,12 @@
 ],
 "sqlState" : "42802"
   },
+  "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD" : {
+"message" : [
+  "Failed to evaluate the user-defined table function because its 
'analyze' method returned a requested OrderingColumn whose column name 
expression included an unnecessary alias ; please remove this alias 
and then try the query again."
+],
+"sqlState" : "42802"
+  },
   "UNABLE_TO_ACQUIRE_MEMORY" : {
 "message" : [
   "Unable to acquire  bytes of memory, got 
."
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b64fecafa311..dc1730c78267 100644
--- 

(spark) branch master updated: [SPARK-46233][PYTHON] Migrate all remaining `AttributeError` into PySpark error framework

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new be49ca6dd71b [SPARK-46233][PYTHON] Migrate all remaining 
`AttributeError` into PySpark error framework
be49ca6dd71b is described below

commit be49ca6dd71b87172df9d88f305f06a7b87c9ecf
Author: Haejoon Lee 
AuthorDate: Mon Dec 4 16:18:27 2023 -0800

[SPARK-46233][PYTHON] Migrate all remaining `AttributeError` into PySpark 
error framework

### What changes were proposed in this pull request?

This PR proposes to migrate all remaining `AttributeError`  from 
`pyspark/sql/*` into PySpark error framework, `PySparkAttributeError` with 
assigning dedicated error classes.

### Why are the changes needed?

To improve the error handling in PySpark.

### Does this PR introduce _any_ user-facing change?

No API changes, but the user-facing error messages will be improved.

### How was this patch tested?

The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44150 from itholic/migrate_attribute_error.

Authored-by: Haejoon Lee 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/connect/dataframe.py | 10 +++---
 python/pyspark/sql/dataframe.py | 11 ---
 python/pyspark/sql/types.py | 13 ++---
 3 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index a73a24818c0c..6a1d45712163 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -14,7 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from pyspark.errors.exceptions.base import SessionNotSameException, 
PySparkIndexError
+from pyspark.errors.exceptions.base import (
+SessionNotSameException,
+PySparkIndexError,
+PySparkAttributeError,
+)
 from pyspark.sql.connect.utils import check_dependencies
 
 check_dependencies(__name__)
@@ -1694,8 +1698,8 @@ class DataFrame:
 )
 
 if name not in self.columns:
-raise AttributeError(
-"'%s' object has no attribute '%s'" % 
(self.__class__.__name__, name)
+raise PySparkAttributeError(
+error_class="ATTRIBUTE_NOT_SUPPORTED", 
message_parameters={"attr_name": name}
 )
 
 return _to_col_with_plan_id(
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 8b40b222a289..5211d874ba33 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -43,7 +43,12 @@ from py4j.java_gateway import JavaObject, JVMView
 from pyspark import copy_func, _NoValue
 from pyspark._globals import _NoValueType
 from pyspark.context import SparkContext
-from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkIndexError
+from pyspark.errors import (
+PySparkTypeError,
+PySparkValueError,
+PySparkIndexError,
+PySparkAttributeError,
+)
 from pyspark.rdd import (
 RDD,
 _load_from_socket,
@@ -3613,8 +3618,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 +---+
 """
 if name not in self.columns:
-raise AttributeError(
-"'%s' object has no attribute '%s'" % 
(self.__class__.__name__, name)
+raise PySparkAttributeError(
+error_class="ATTRIBUTE_NOT_SUPPORTED", 
message_parameters={"attr_name": name}
 )
 jc = self._jdf.apply(name)
 return Column(jc)
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index cbfc4ab5df02..d3eed77b3838 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -55,6 +55,7 @@ from pyspark.errors import (
 PySparkTypeError,
 PySparkValueError,
 PySparkIndexError,
+PySparkAttributeError,
 PySparkKeyError,
 )
 
@@ -2574,16 +2575,22 @@ class Row(tuple):
 
 def __getattr__(self, item: str) -> Any:
 if item.startswith("__"):
-raise AttributeError(item)
+raise PySparkAttributeError(
+error_class="ATTRIBUTE_NOT_SUPPORTED", 
message_parameters={"attr_name": item}
+)
 try:
 # it will be slow when it has many fields,
 # but this will not be used in normal cases
 idx = self.__fields__.index(item)
 return self[idx]
 except IndexError:
-raise AttributeError(item)
+raise PySparkAttributeError(
+error_class="ATTRIBUTE_NOT_SUPPORTED", 
message_parameters={"attr_name": item}
+)
 except ValueError:
-raise AttributeError(item)

(spark) branch master updated: [SPARK-45684][SQL][SS][TESTS][FOLLOWUP] Use `++` instead of `s.c.SeqOps#concat`

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 19e68ba34098 [SPARK-45684][SQL][SS][TESTS][FOLLOWUP] Use `++` instead 
of `s.c.SeqOps#concat`
19e68ba34098 is described below

commit 19e68ba34098baeef617b31c227a14efc275c46d
Author: yangjie01 
AuthorDate: Tue Dec 5 08:50:29 2023 +0900

[SPARK-45684][SQL][SS][TESTS][FOLLOWUP] Use `++` instead of 
`s.c.SeqOps#concat`

### What changes were proposed in this pull request?
This pr use `++` instead of `s.c.SeqOps#concat` to address comments: 
https://github.com/apache/spark/pull/43575#discussion_r1414163363

### Why are the changes needed?
 `++` is alias for `concat`,  but the readability of ++ is better.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44161 from LuciferYang/SPARK-45684-FOLLOWUP.

Authored-by: yangjie01 
Signed-off-by: Hyukjin Kwon 
---
 .../src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 883f64ff7af4..cd0bbfd47b2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -174,7 +174,7 @@ class StreamSuite extends StreamTest {
 try {
   query.processAllAvailable()
   val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
-  checkDatasetUnorderly[Long](outputDf, (0L to 10L).concat(0L to 10L): 
_*)
+  checkDatasetUnorderly[Long](outputDf, (0L to 10L) ++ (0L to 10L): _*)
 } finally {
   query.stop()
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (d314e4bba54a -> d5f2539f83f6)

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d314e4bba54a [SPARK-46250][CONNECT][SS] Deflake test_parity_listener
 add d5f2539f83f6 [SPARK-45940][FOLLOWUP][TESTS] Only test Python data 
source when Python and PySpark environments are available

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala  | 3 +++
 1 file changed, 3 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46250][CONNECT][SS] Deflake test_parity_listener

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d314e4bba54a [SPARK-46250][CONNECT][SS] Deflake test_parity_listener
d314e4bba54a is described below

commit d314e4bba54a08babad8f8ecd564e7440c0e97f4
Author: Wei Liu 
AuthorDate: Tue Dec 5 08:48:23 2023 +0900

[SPARK-46250][CONNECT][SS] Deflake test_parity_listener

### What changes were proposed in this pull request?

We didn’t give the onQueryTerminated handler enough time to write the data 
to that table, and hence it's possible that we read from a not existing table 
`listener_terminated_event`.

### Why are the changes needed?

Deflake existing tests
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test only change

### Was this patch authored or co-authored using generative AI tooling?

Closes #44166 from WweiL/SPARK-46250-deflaky-parity-listener.

Authored-by: Wei Liu 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/connect/streaming/test_parity_listener.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py 
b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py
index ca02cf29ee7d..4fc040642bed 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py
@@ -68,11 +68,14 @@ class 
StreamingListenerParityTests(StreamingListenerTestsMixin, ReusedConnectTes
 )
 
 self.assertTrue(q.isActive)
-time.sleep(10)
-self.assertTrue(q.lastProgress["batchId"] > 0)  # ensure at least 
one batch is ran
+# ensure at least one batch is ran
+while q.lastProgress is None or q.lastProgress["batchId"] == 0:
+time.sleep(5)
 q.stop()
 self.assertFalse(q.isActive)
 
+time.sleep(60)  # Sleep to make sure listener_terminated_events is 
written successfully
+
 start_event = pyspark.cloudpickle.loads(
 self.spark.read.table("listener_start_events").collect()[0][0]
 )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46243][SQL][DOCS] Describe arguments of `decode()`

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 47e8e02141d4 [SPARK-46243][SQL][DOCS] Describe arguments of `decode()`
47e8e02141d4 is described below

commit 47e8e02141d4f56f5cb92ec4762c9ea5c9b91e90
Author: Max Gekk 
AuthorDate: Tue Dec 5 08:31:49 2023 +0900

[SPARK-46243][SQL][DOCS] Describe arguments of `decode()`

### What changes were proposed in this pull request?
In the PR, I propose to update the description of the `StringDecode` 
expression and apparently the `decode()` function by describing the arguments 
`bin` and `charset`.

 The updated docs:
https://github.com/apache/spark/assets/1580697/a177f81d-2f39-45ff-bc28-b217dad4e128;>

### Why are the changes needed?
To improve user experience with Spark SQL by documenting the public 
function.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By manually checking the generated docs.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44157 from MaxGekk/doc-decode-params.

Authored-by: Max Gekk 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/catalyst/expressions/stringExpressions.scala | 16 
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 7c5d65d2b958..259556826ad9 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -2594,6 +2594,11 @@ object Decode {
   the corresponding result. If no match is found, then it returns default. 
If default
   is omitted, it returns null.
   """,
+  arguments = """
+Arguments:
+  * bin - a binary expression to decode
+  * charset - one of the charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 
'UTF-16BE', 'UTF-16LE', 'UTF-16' to decode `bin` into a STRING. It is case 
insensitive.
+  """,
   examples = """
 Examples:
   > SELECT _FUNC_(encode('abc', 'utf-8'), 'utf-8');
@@ -2623,18 +2628,21 @@ case class Decode(params: Seq[Expression], replacement: 
Expression)
 }
 
 /**
- * Decodes the first argument into a String using the provided character set
- * (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 
'UTF-16').
- * If either argument is null, the result will also be null.
+ * Decodes the first argument into a String using the provided character set.
  */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(bin, charset) - Decodes the first argument using the second 
argument character set.",
+  usage = "_FUNC_(bin, charset) - Decodes the first argument using the second 
argument character set. If either argument is null, the result will also be 
null.",
   examples = """
 Examples:
   > SELECT _FUNC_(encode('abc', 'utf-8'), 'utf-8');
abc
   """,
+  arguments = """
+Arguments:
+  * bin - a binary expression to decode
+  * charset - one of the charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 
'UTF-16BE', 'UTF-16LE', 'UTF-16' to decode `bin` into a STRING. It is case 
insensitive.
+  """,
   since = "1.5.0",
   group = "string_funcs")
 // scalastyle:on line.size.limit


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.3 updated: [SPARK-46239][CORE] Hide `Jetty` info

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new aaec17fb244c [SPARK-46239][CORE] Hide `Jetty` info
aaec17fb244c is described below

commit aaec17fb244c175068f4de52e1288acc6125c5e9
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 4 14:41:27 2023 -0800

[SPARK-46239][CORE] Hide `Jetty` info

**What changes were proposed in this pull request?**
The PR sets parameters to hide the version of  jetty in spark.

**Why are the changes needed?**
It can avoid obtaining remote WWW service information through HTTP.

**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Manual review

**Was this patch authored or co-authored using generative AI tooling?**
No

Closes #44158 from chenyu-opensource/branch-SPARK-46239.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: chenyu 
<119398199+chenyu-opensou...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6)
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 834e4dfc4841..44bbd95fad13 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -311,6 +311,12 @@ private[spark] object JettyUtils extends Logging {
   logDebug(s"Using requestHeaderSize: $requestHeaderSize")
   httpConfig.setRequestHeaderSize(requestHeaderSize)
 
+  // Hide information.
+  logDebug("Using setSendServerVersion: false")
+  httpConfig.setSendServerVersion(false)
+  logDebug("Using setSendXPoweredBy: false")
+  httpConfig.setSendXPoweredBy(false)
+
   // If SSL is configured, create the secure connector first.
   val securePort = sslOptions.createJettySslContextFactory().map { factory 
=>
 val securePort = sslOptions.port.getOrElse(if (port > 0) 
Utils.userPort(port, 400) else 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.4 updated: [SPARK-46239][CORE] Hide `Jetty` info

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 757c3a9d62b7 [SPARK-46239][CORE] Hide `Jetty` info
757c3a9d62b7 is described below

commit 757c3a9d62b7519f5bdc50d09e472b0490b6bae8
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 4 14:41:27 2023 -0800

[SPARK-46239][CORE] Hide `Jetty` info

**What changes were proposed in this pull request?**
The PR sets parameters to hide the version of  jetty in spark.

**Why are the changes needed?**
It can avoid obtaining remote WWW service information through HTTP.

**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Manual review

**Was this patch authored or co-authored using generative AI tooling?**
No

Closes #44158 from chenyu-opensource/branch-SPARK-46239.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: chenyu 
<119398199+chenyu-opensou...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6)
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index d8119fb94984..2407152a5498 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -312,6 +312,12 @@ private[spark] object JettyUtils extends Logging {
   logDebug(s"Using requestHeaderSize: $requestHeaderSize")
   httpConfig.setRequestHeaderSize(requestHeaderSize)
 
+  // Hide information.
+  logDebug("Using setSendServerVersion: false")
+  httpConfig.setSendServerVersion(false)
+  logDebug("Using setSendXPoweredBy: false")
+  httpConfig.setSendXPoweredBy(false)
+
   // If SSL is configured, create the secure connector first.
   val securePort = sslOptions.createJettySslContextFactory().map { factory 
=>
 val securePort = sslOptions.port.getOrElse(if (port > 0) 
Utils.userPort(port, 400) else 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-46239][CORE] Hide `Jetty` info

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 1321b4e64dea [SPARK-46239][CORE] Hide `Jetty` info
1321b4e64dea is described below

commit 1321b4e64deaa1e58bf297c25b72319083056568
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 4 14:41:27 2023 -0800

[SPARK-46239][CORE] Hide `Jetty` info

**What changes were proposed in this pull request?**
The PR sets parameters to hide the version of  jetty in spark.

**Why are the changes needed?**
It can avoid obtaining remote WWW service information through HTTP.

**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Manual review

**Was this patch authored or co-authored using generative AI tooling?**
No

Closes #44158 from chenyu-opensource/branch-SPARK-46239.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: chenyu 
<119398199+chenyu-opensou...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6)
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 9582bdbf5264..21753361e627 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -312,6 +312,12 @@ private[spark] object JettyUtils extends Logging {
   logDebug(s"Using requestHeaderSize: $requestHeaderSize")
   httpConfig.setRequestHeaderSize(requestHeaderSize)
 
+  // Hide information.
+  logDebug("Using setSendServerVersion: false")
+  httpConfig.setSendServerVersion(false)
+  logDebug("Using setSendXPoweredBy: false")
+  httpConfig.setSendXPoweredBy(false)
+
   // If SSL is configured, create the secure connector first.
   val securePort = sslOptions.createJettySslContextFactory().map { factory 
=>
 val securePort = sslOptions.port.getOrElse(if (port > 0) 
Utils.userPort(port, 400) else 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46239][CORE] Hide `Jetty` info

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ff4f59341215 [SPARK-46239][CORE] Hide `Jetty` info
ff4f59341215 is described below

commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 4 14:41:27 2023 -0800

[SPARK-46239][CORE] Hide `Jetty` info

**What changes were proposed in this pull request?**
The PR sets parameters to hide the version of  jetty in spark.

**Why are the changes needed?**
It can avoid obtaining remote WWW service information through HTTP.

**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Manual review

**Was this patch authored or co-authored using generative AI tooling?**
No

Closes #44158 from chenyu-opensource/branch-SPARK-46239.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: chenyu 
<119398199+chenyu-opensou...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 22adcbc32ed8..50251975d733 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -314,6 +314,12 @@ private[spark] object JettyUtils extends Logging {
   logDebug(s"Using requestHeaderSize: $requestHeaderSize")
   httpConfig.setRequestHeaderSize(requestHeaderSize)
 
+  // Hide information.
+  logDebug("Using setSendServerVersion: false")
+  httpConfig.setSendServerVersion(false)
+  logDebug("Using setSendXPoweredBy: false")
+  httpConfig.setSendXPoweredBy(false)
+
   // If SSL is configured, create the secure connector first.
   val securePort = sslOptions.createJettySslContextFactory().map { factory 
=>
 val securePort = sslOptions.port.getOrElse(if (port > 0) 
Utils.userPort(port, 400) else 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.3 updated: [SPARK-46092][SQL][3.3] Don't push down Parquet row group filters that overflow

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new c9412307394f [SPARK-46092][SQL][3.3] Don't push down Parquet row group 
filters that overflow
c9412307394f is described below

commit c9412307394fd1a277dd7fd5b173ec34e4b123d6
Author: Johan Lasperas 
AuthorDate: Mon Dec 4 12:50:57 2023 -0800

[SPARK-46092][SQL][3.3] Don't push down Parquet row group filters that 
overflow

This is a cherry-pick from https://github.com/apache/spark/pull/44006 to 
spark 3.3

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group 
filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping 
row groups if the predicate value doesn't fit in an INT. This can happen if the 
read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the 
overflow can lead to row groups being incorrectly skipped, bypassing the reader 
altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified 
as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting 
integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no 
readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, 
the read will fail, raising an error about the unsupported conversion from INT 
to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is 
created when the predicate value overflows or when the value type isn't 
compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue 
described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44156 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.3.

Authored-by: Johan Lasperas 
Signed-off-by: Dongjoon Hyun 
---
 .../datasources/parquet/ParquetFilters.scala   | 10 ++-
 .../datasources/parquet/ParquetFilterSuite.scala   | 71 ++
 .../datasources/parquet/ParquetQuerySuite.scala| 20 ++
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 210f37d473ad..969fbab746ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, 
Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.sql.{Date, Timestamp}
@@ -600,7 +600,13 @@ class ParquetFilters(
 value == null || (nameToParquetField(name).fieldType match {
   case ParquetBooleanType => value.isInstanceOf[JBoolean]
   case ParquetIntegerType if value.isInstanceOf[Period] => true
-  case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+  case ParquetByteType | ParquetShortType | ParquetIntegerType => value 
match {
+// Byte/Short/Int are all stored as INT32 in Parquet so filters are 
built using type Int.
+// We don't create a filter if the value would overflow.
+case _: JByte | _: JShort | _: Integer => true
+case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= 
Int.MaxValue
+case _ => false
+  }
   case ParquetLongType => value.isInstanceOf[JLong] || 
value.isInstanceOf[Duration]
   case ParquetFloatType => value.isInstanceOf[JFloat]
   case ParquetDoubleType => 

(spark) branch master updated: [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace `s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter`

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ac0bd2eb7b40 [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace 
`s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter`
ac0bd2eb7b40 is described below

commit ac0bd2eb7b4089096f9fb288482b2f1b5049b7e2
Author: yangjie01 
AuthorDate: Mon Dec 4 12:49:52 2023 -0800

[SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace 
`s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter`

### What changes were proposed in this pull request?
This pr uses `s.c.MapOps.filter` to simplify code pattern 
`s.c.MapOps.view.filterKeys`.

### Why are the changes needed?
The coding pattern of `s.c.MapOps.view.filterKeys` seems verbose, it can be 
simplified using `s.c.MapOps.filter`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44160 from LuciferYang/SPARK-46245.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/kafka010/KafkaContinuousStream.scala |  2 +-
 .../scala/org/apache/spark/deploy/master/Master.scala  |  3 +--
 .../spark/deploy/rest/RestSubmissionClient.scala   |  4 ++--
 .../spark/executor/CoarseGrainedExecutorBackend.scala  |  8 
 .../org/apache/spark/resource/ResourceProfile.scala|  4 ++--
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  4 +++-
 .../cluster/CoarseGrainedSchedulerBackend.scala|  2 +-
 .../scheduler/cluster/StandaloneSchedulerBackend.scala |  4 ++--
 .../spark/storage/ShuffleBlockFetcherIterator.scala|  2 +-
 .../main/scala/org/apache/spark/ui/PagedTable.scala|  7 +++
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |  2 +-
 .../scala/org/apache/spark/SparkThrowableSuite.scala   |  5 ++---
 .../spark/internal/plugin/PluginContainerSuite.scala   |  2 +-
 .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala  |  6 --
 .../apache/spark/deploy/yarn/ExecutorRunnable.scala|  2 +-
 .../org/apache/spark/deploy/yarn/YarnAllocator.scala   |  2 +-
 .../apache/spark/sql/catalyst/catalog/interface.scala  |  8 +++-
 .../catalyst/expressions/codegen/CodeGenerator.scala   |  2 +-
 .../catalyst/plans/logical/basicLogicalOperators.scala |  5 +++--
 .../scala/org/apache/spark/sql/DataFrameWriter.scala   |  2 +-
 .../apache/spark/sql/execution/command/tables.scala|  7 ---
 .../spark/sql/execution/datasources/DataSource.scala   |  4 ++--
 .../spark/sql/execution/datasources/FileFormat.scala   |  2 +-
 .../sql/execution/datasources/jdbc/JDBCOptions.scala   |  3 ++-
 .../sql/execution/datasources/v2/CacheTableExec.scala  |  3 ++-
 .../execution/datasources/v2/DataSourceV2Utils.scala   |  2 +-
 .../execution/datasources/v2/FileDataSourceV2.scala|  2 +-
 .../execution/datasources/v2/ShowCreateTableExec.scala | 18 ++
 .../execution/datasources/v2/V2SessionCatalog.scala|  4 ++--
 .../execution/streaming/state/RocksDBFileManager.scala |  6 +++---
 .../apache/spark/sql/execution/ui/ExecutionPage.scala  |  4 ++--
 .../apache/spark/sql/streaming/DataStreamReader.scala  |  2 +-
 .../apache/spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../apache/spark/sql/hive/HiveExternalCatalog.scala| 11 ++-
 .../apache/spark/sql/hive/HiveMetastoreCatalog.scala   |  7 +++
 .../apache/spark/sql/hive/execution/HiveOptions.scala  |  6 +++---
 .../spark/sql/hive/HiveSchemaInferenceSuite.scala  |  2 +-
 .../org/apache/spark/sql/hive/StatisticsSuite.scala| 10 +-
 .../apache/spark/sql/hive/execution/HiveDDLSuite.scala |  8 
 .../hive/execution/command/ShowCreateTableSuite.scala  |  2 +-
 40 files changed, 93 insertions(+), 88 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 026c4d560722..a86acd971a1c 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -102,7 +102,7 @@ class KafkaContinuousStream(
 }
 
 val startOffsets = newPartitionOffsets ++
-  oldStartPartitionOffsets.view.filterKeys(!deletedPartitions.contains(_))
+  oldStartPartitionOffsets.filter { case (k, _) => 
!deletedPartitions.contains(k) }
 knownPartitions = startOffsets.keySet
 
 startOffsets.toSeq.map {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0fe72e28ea5b..2e1d7b9bce33 

(spark) branch master updated: [SPARK-32246][BUILD][INFRA] Enable `streaming-kinesis-asl` tests in Github Action CI

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 64ec5f1017e1 [SPARK-32246][BUILD][INFRA] Enable 
`streaming-kinesis-asl` tests in Github Action CI
64ec5f1017e1 is described below

commit 64ec5f1017e1f2ca479060ca76f18b1c4a803b81
Author: Junyu Chen 
AuthorDate: Mon Dec 4 12:44:11 2023 -0800

[SPARK-32246][BUILD][INFRA] Enable `streaming-kinesis-asl` tests in Github 
Action CI

### What changes were proposed in this pull request?
This PR attempts to set up Kinesis tests in one of the existing Github 
Actions. Note that currently there are totally 57 tests in the Kinesis-asl 
module, and this PR enabled 35 of them. The remaining tests requires 
interaction with Amazon Kinesis service which would incur billing costs to 
users. Hence they are not included in the Github Action.

### Why are the changes needed?

Addressing the comments in this PR: 
https://github.com/apache/spark/pull/42581#issuecomment-1685925739 which 
attempts to upgrade the AWS SDK to v2 for Spark Kinesis connector. Since 
Kinesis tests are not being run in the Github Actions, there is no automated 
mechanism to verify the SDK v2 upgrade changes in this module.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1. All existing Github Actions passed.
2. All Kinesis tests passed when running locally: `export 
ENABLE_KINESIS_TESTS=1 && mvn test -Pkinesis-asl -pl connector/kinesis-asl`
```
Tests: succeeded 57, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] 

[INFO] BUILD SUCCESS
[INFO] 

[INFO] Total time:  13:25 min
[INFO] Finished at: 2023-11-12T00:15:49+08:00
[INFO] 

```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43736 from junyuc25/junyuc25/kinesis-test.

Authored-by: Junyu Chen 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml | 2 +-
 dev/sparktestsupport/modules.py  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 4612b504ccdf..d58356ec1c5d 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -150,7 +150,7 @@ jobs:
   - >-
 mllib-local, mllib, graphx
   - >-
-streaming, sql-kafka-0-10, streaming-kafka-0-10,
+streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
 yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
 connect, protobuf
 # Here, we split Hive and SQL tests into some of slow ones and the 
rest of them.
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 8995b7de0df9..15b2e8f186e5 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -359,7 +359,7 @@ streaming_kinesis_asl = Module(
 build_profile_flags=[
 "-Pkinesis-asl",
 ],
-environ={"ENABLE_KINESIS_TESTS": "1"},
+environ={"ENABLE_KINESIS_TESTS": "0"},
 sbt_test_goals=[
 "streaming-kinesis-asl/test",
 ],


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.4 updated: [SPARK-46092][SQL][3.4] Don't push down Parquet row group filters that overflow

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 05b5c9e2e3df [SPARK-46092][SQL][3.4] Don't push down Parquet row group 
filters that overflow
05b5c9e2e3df is described below

commit 05b5c9e2e3dfb7641e59895afc8ecb0f4f861127
Author: Johan Lasperas 
AuthorDate: Mon Dec 4 08:59:21 2023 -0800

[SPARK-46092][SQL][3.4] Don't push down Parquet row group filters that 
overflow

This is a cherry-pick from https://github.com/apache/spark/pull/44006 to 
spark 3.4

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group 
filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping 
row groups if the predicate value doesn't fit in an INT. This can happen if the 
read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the 
overflow can lead to row groups being incorrectly skipped, bypassing the reader 
altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified 
as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting 
integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no 
readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, 
the read will fail, raising an error about the unsupported conversion from INT 
to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is 
created when the predicate value overflows or when the value type isn't 
compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue 
described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44155 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.4.

Authored-by: Johan Lasperas 
Signed-off-by: Dongjoon Hyun 
---
 .../datasources/parquet/ParquetFilters.scala   | 10 ++-
 .../datasources/parquet/ParquetFilterSuite.scala   | 71 ++
 .../datasources/parquet/ParquetQuerySuite.scala| 20 ++
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 6994e1ba39d9..5943dbdfb786 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, 
Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.sql.{Date, Timestamp}
@@ -612,7 +612,13 @@ class ParquetFilters(
 value == null || (nameToParquetField(name).fieldType match {
   case ParquetBooleanType => value.isInstanceOf[JBoolean]
   case ParquetIntegerType if value.isInstanceOf[Period] => true
-  case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+  case ParquetByteType | ParquetShortType | ParquetIntegerType => value 
match {
+// Byte/Short/Int are all stored as INT32 in Parquet so filters are 
built using type Int.
+// We don't create a filter if the value would overflow.
+case _: JByte | _: JShort | _: Integer => true
+case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= 
Int.MaxValue
+case _ => false
+  }
   case ParquetLongType => value.isInstanceOf[JLong] || 
value.isInstanceOf[Duration]
   case ParquetFloatType => value.isInstanceOf[JFloat]
   case ParquetDoubleType => 

(spark) branch branch-3.5 updated: [SPARK-46092][SQL][3.5] Don't push down Parquet row group filters that overflow

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 97472c91ed56 [SPARK-46092][SQL][3.5] Don't push down Parquet row group 
filters that overflow
97472c91ed56 is described below

commit 97472c91ed5660c5af862e8da99d44a1c24f2815
Author: Johan Lasperas 
AuthorDate: Mon Dec 4 08:58:03 2023 -0800

[SPARK-46092][SQL][3.5] Don't push down Parquet row group filters that 
overflow

This is a cherry-pick from https://github.com/apache/spark/pull/44006 to 
spark 3.5

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group 
filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping 
row groups if the predicate value doesn't fit in an INT. This can happen if the 
read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the 
overflow can lead to row groups being incorrectly skipped, bypassing the reader 
altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified 
as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting 
integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no 
readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, 
the read will fail, raising an error about the unsupported conversion from INT 
to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is 
created when the predicate value overflows or when the value type isn't 
compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue 
described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44154 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.5.

Authored-by: Johan Lasperas 
Signed-off-by: Dongjoon Hyun 
---
 .../datasources/parquet/ParquetFilters.scala   | 10 ++-
 .../datasources/parquet/ParquetFilterSuite.scala   | 71 ++
 .../datasources/parquet/ParquetQuerySuite.scala| 20 ++
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 5899b6621ad8..0983841dc8c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, 
Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.sql.{Date, Timestamp}
@@ -612,7 +612,13 @@ class ParquetFilters(
 value == null || (nameToParquetField(name).fieldType match {
   case ParquetBooleanType => value.isInstanceOf[JBoolean]
   case ParquetIntegerType if value.isInstanceOf[Period] => true
-  case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+  case ParquetByteType | ParquetShortType | ParquetIntegerType => value 
match {
+// Byte/Short/Int are all stored as INT32 in Parquet so filters are 
built using type Int.
+// We don't create a filter if the value would overflow.
+case _: JByte | _: JShort | _: Integer => true
+case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= 
Int.MaxValue
+case _ => false
+  }
   case ParquetLongType => value.isInstanceOf[JLong] || 
value.isInstanceOf[Duration]
   case ParquetFloatType => value.isInstanceOf[JFloat]
   case ParquetDoubleType => 

(spark) branch master updated: [SPARK-46231][PYTHON] Migrate all remaining `NotImplementedError` & `TypeError` into PySpark error framework

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9666bf37958e [SPARK-46231][PYTHON] Migrate all remaining 
`NotImplementedError` & `TypeError` into PySpark error framework
9666bf37958e is described below

commit 9666bf37958e5381278ca622bf7ec4b4ccb13d79
Author: Haejoon Lee 
AuthorDate: Mon Dec 4 08:54:50 2023 -0800

[SPARK-46231][PYTHON] Migrate all remaining `NotImplementedError` & 
`TypeError` into PySpark error framework

### What changes were proposed in this pull request?

This PR proposes to migrate all remaining `NotImplementedError` and 
`TypeError`  from `pyspark/sql/*` into PySpark error framework, 
`PySparkNotImplementedError` with assigning dedicated error classes.

### Why are the changes needed?

To improve the error handling in PySpark.

### Does this PR introduce _any_ user-facing change?

No API changes, but the user-facing error messages will be improved.

### How was this patch tested?

The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44148 from itholic/not_impl_and_type.

Authored-by: Haejoon Lee 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/datasource.py| 21 +
 .../sql/tests/pandas/test_pandas_udf_grouped_agg.py |  3 +++
 python/pyspark/sql/udf.py   |  8 +++-
 3 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py
index 1c5b6d663285..4713ca5366a7 100644
--- a/python/pyspark/sql/datasource.py
+++ b/python/pyspark/sql/datasource.py
@@ -19,6 +19,7 @@ from typing import final, Any, Dict, Iterator, List, 
Sequence, Tuple, Type, Unio
 
 from pyspark.sql import Row
 from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
 
 if TYPE_CHECKING:
 from pyspark.sql._typing import OptionalPrimitiveType
@@ -103,7 +104,10 @@ class DataSource(ABC):
 >>> def schema(self):
 ...   return StructType().add("a", "int").add("b", "string")
 """
-raise NotImplementedError
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "schema"},
+)
 
 def reader(self, schema: StructType) -> "DataSourceReader":
 """
@@ -121,7 +125,10 @@ class DataSource(ABC):
 reader : DataSourceReader
 A reader instance for this data source.
 """
-raise NotImplementedError
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "reader"},
+)
 
 def writer(self, schema: StructType, saveMode: str) -> "DataSourceWriter":
 """
@@ -142,7 +149,10 @@ class DataSource(ABC):
 writer : DataSourceWriter
 A writer instance for this data source.
 """
-raise NotImplementedError
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "writer"},
+)
 
 
 class InputPartition:
@@ -239,7 +249,10 @@ class DataSourceReader(ABC):
 >>> def partitions(self):
 ... return [RangeInputPartition(1, 3), RangeInputPartition(5, 10)]
 """
-raise NotImplementedError
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
 
 @abstractmethod
 def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]:
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index b500be7a9695..455bb09a7dc4 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -720,6 +720,9 @@ class GroupedAggPandasUDFTestsMixin:
 
 
 class GroupedAggPandasUDFTests(GroupedAggPandasUDFTestsMixin, 
ReusedSQLTestCase):
+def test_unsupported_types(self):
+super().test_unsupported_types()
+
 pass
 
 
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 9ffdbb218711..351bcea3f389 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -339,7 +339,13 @@ class UserDefinedFunction:
 try:
 # StructType is not yet allowed as a return type, explicitly 
check here to fail fast
 if isinstance(self._returnType_placeholder, StructType):
-raise TypeError
+raise PySparkNotImplementedError(
+

(spark) branch master updated: [SPARK-46237][SQL][TESTS] Make `HiveDDLSuite` independently testable

2023-12-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 37d19b9ee0e4 [SPARK-46237][SQL][TESTS] Make `HiveDDLSuite` 
independently testable
37d19b9ee0e4 is described below

commit 37d19b9ee0e4e100e37358e71d771a2e42d01d88
Author: yangjie01 
AuthorDate: Mon Dec 4 08:52:23 2023 -0800

[SPARK-46237][SQL][TESTS] Make `HiveDDLSuite` independently testable

### What changes were proposed in this pull request?
When I test `HiveDDLSuite` with

```
build/sbt "hive/testOnly org.apache.spark.sql.hive.execution.HiveDDLSuite" 
-Phive
```
This test throws an error:

```
[info] - SPARK-34261: Avoid side effect if create exists temporary function 
*** FAILED *** (4 milliseconds)
[info]   java.util.NoSuchElementException: key not found: default
[info]   at scala.collection.MapOps.default(Map.scala:274)
[info]   at scala.collection.MapOps.default$(Map.scala:273)
[info]   at scala.collection.AbstractMap.default(Map.scala:405)
[info]   at scala.collection.MapOps.apply(Map.scala:176)
[info]   at scala.collection.MapOps.apply$(Map.scala:175)
[info]   at scala.collection.AbstractMap.apply(Map.scala:405)
[info]   at 
org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$445(HiveDDLSuite.scala:3275)
[info]   at 
org.apache.spark.sql.test.SQLTestUtilsBase.withUserDefinedFunction(SQLTestUtils.scala:256)
[info]   at 
org.apache.spark.sql.test.SQLTestUtilsBase.withUserDefinedFunction$(SQLTestUtils.scala:254)
[info]   at 
org.apache.spark.sql.execution.command.DDLSuite.withUserDefinedFunction(DDLSuite.scala:326)
[info]   at 
org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$444(HiveDDLSuite.scala:3267)
```

I manually printed the content of `spark.sparkContext.addedJars`, which is 
an empty `Map`.

However, when I execute

```
build/sbt "hive/testOnly org.apache.spark.sql.hive.execution.SQLQuerySuite 
org.apache.spark.sql.hive.execution.HiveDDLSuite" -Phive
```
All tests pass, and the content of `spark.sparkContext.addedJars` is

```
Map(default -> Map(spark://localhost:54875/jars/SPARK-21101-1.0.jar -> 
1701676986594, spark://localhost:54875/jars/hive-contrib-2.3.9.jar -> 
1701676944590, spark://localhost:54875/jars/TestUDTF.jar -> 1701676921340))
```

The reason why this failure is not reproduced in the GitHub Action test is 
because `SQLQuerySuite` is indeed executed before `HiveDDLSuite`.

So in the current PR, I change to use 
`.get("default").foreach(_.remove(k))` that the remove operation is only 
performed when `.get("default")` is not `None`.

### Why are the changes needed?
Make `HiveDDLSuite` independently testable.

### Does this PR introduce _any_ user-facing change?
No, just for test

### How was this patch tested?
- Pass Github Actions
- Manual check `HiveDDLSuite` with this pr and all test passed

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44153 from LuciferYang/HiveDDLSuite.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c3a528da382a..2f5d1fcbb540 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -3270,7 +3270,7 @@ class HiveDDLSuite
   val jarName = "TestUDTF.jar"
   val jar = 
spark.asInstanceOf[TestHiveSparkSession].getHiveFile(jarName).toURI.toString
   spark.sparkContext.allAddedJars.keys.find(_.contains(jarName))
-.foreach(spark.sparkContext.addedJars("default").remove)
+.foreach(k => 
spark.sparkContext.addedJars.get("default").foreach(_.remove(k)))
   assert(!spark.sparkContext.listJars().exists(_.contains(jarName)))
   val e = intercept[AnalysisException] {
 sql("CREATE TEMPORARY FUNCTION f1 AS " +


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46234][PYTHON] Introduce `PySparkKeyError` for PySpark error framework

2023-12-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2423a4517ace [SPARK-46234][PYTHON] Introduce `PySparkKeyError` for 
PySpark error framework
2423a4517ace is described below

commit 2423a4517ace1d76938ee3fd285594900f272552
Author: Haejoon Lee 
AuthorDate: Mon Dec 4 18:37:36 2023 +0800

[SPARK-46234][PYTHON] Introduce `PySparkKeyError` for PySpark error 
framework

### What changes were proposed in this pull request?

This PR proposes to introduce `PySparkKeyError` for error framework, and 
migrate Python built-in `KeyError` into `PySparkKeyError`.

### Why are the changes needed?

For better error handling.

### Does this PR introduce _any_ user-facing change?

No API changes, but it's improve the user-facing error messages.

### How was this patch tested?

The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44151 from itholic/pyspark_keyerror.

Authored-by: Haejoon Lee 
Signed-off-by: Ruifeng Zheng 
---
 python/docs/source/reference/pyspark.errors.rst | 1 +
 python/pyspark/errors/__init__.py   | 2 ++
 python/pyspark/errors/error_classes.py  | 5 +
 python/pyspark/errors/exceptions/base.py| 6 ++
 python/pyspark/sql/types.py | 9 +++--
 5 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index 88cbd405b83d..a4997506b41e 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -38,6 +38,7 @@ Classes
 PySparkAssertionError
 PySparkAttributeError
 PySparkException
+PySparkKeyError
 PySparkNotImplementedError
 PySparkPicklingError
 PySparkRuntimeError
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index 923cb665d112..07033d216432 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -46,6 +46,7 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
 PySparkAssertionError,
 PySparkNotImplementedError,
 PySparkPicklingError,
+PySparkKeyError,
 )
 
 
@@ -77,4 +78,5 @@ __all__ = [
 "PySparkAssertionError",
 "PySparkNotImplementedError",
 "PySparkPicklingError",
+"PySparkKeyError",
 ]
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index d0c0d1c115b0..8ca73ca85de6 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -387,6 +387,11 @@ ERROR_CLASSES_JSON = """
   "Attribute `` is not supported in Spark Connect as it depends 
on the JVM. If you need to use this attribute, do not use Spark Connect when 
creating your session. Visit 
https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession
 for creating regular Spark Session in detail."
 ]
   },
+  "KEY_NOT_EXISTS" : {
+"message" : [
+  "Key `` is not exists."
+]
+  },
   "KEY_VALUE_PAIR_REQUIRED" : {
 "message" : [
   "Key-value pair or a list of pairs is required."
diff --git a/python/pyspark/errors/exceptions/base.py 
b/python/pyspark/errors/exceptions/base.py
index 4a2b31418e29..0f4001483b7f 100644
--- a/python/pyspark/errors/exceptions/base.py
+++ b/python/pyspark/errors/exceptions/base.py
@@ -264,3 +264,9 @@ class PySparkImportError(PySparkException, ImportError):
 """
 Wrapper class for ImportError to support error classes.
 """
+
+
+class PySparkKeyError(PySparkException, KeyError):
+"""
+Wrapper class for KeyError to support error classes.
+"""
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 932a5a703ea9..cbfc4ab5df02 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -55,6 +55,7 @@ from pyspark.errors import (
 PySparkTypeError,
 PySparkValueError,
 PySparkIndexError,
+PySparkKeyError,
 )
 
 if has_numpy:
@@ -1042,7 +1043,9 @@ class StructType(DataType):
 for field in self:
 if field.name == key:
 return field
-raise KeyError("No StructField named {0}".format(key))
+raise PySparkKeyError(
+error_class="KEY_NOT_EXISTS", message_parameters={"key": 
str(key)}
+)
 elif isinstance(key, int):
 try:
 return self.fields[key]
@@ -2563,7 +2566,9 @@ class Row(tuple):
 idx = self.__fields__.index(item)
 return super(Row, self).__getitem__(idx)
 except IndexError:
-raise KeyError(item)
+raise PySparkKeyError(
+ 

(spark) branch master updated: [SPARK-46186][CONNECT] Fix illegal state transition when ExecuteThreadRunner interrupted before started

2023-12-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new be9a710efb7b [SPARK-46186][CONNECT] Fix illegal state transition when 
ExecuteThreadRunner interrupted before started
be9a710efb7b is described below

commit be9a710efb7b1b9565c52766728d793fb497b483
Author: Juliusz Sompolski 
AuthorDate: Mon Dec 4 19:13:18 2023 +0900

[SPARK-46186][CONNECT] Fix illegal state transition when 
ExecuteThreadRunner interrupted before started

### What changes were proposed in this pull request?

A race condition sending interrupt (or releaseSession) just after execute 
could cause:
```
[info]   org.apache.spark.SparkException: java.lang.IllegalStateException:
[info] operationId: fa653330-587a-41c7-a4a9-7987f070dcba with 
status Started
[info] is not within statuses List(Finished, Failed, Canceled) for 
event Closed
```
and
```
[info]   org.apache.spark.SparkException: java.lang.IllegalStateException:
[info] operationId: fa653330-587a-41c7-a4a9-7987f070dcba with 
status Closed
[info] is not within statuses List(Started, Analyzed, 
ReadyForExecution, Finished, Failed) for event Canceled
```
when the interrupt arrives before the thread in ExecuteThreadRunner is 
started.

This would cause in ExecuteHolder close:
```
  runner.interrupt() <- just sets interrupted = true
  runner.join() <- thread didn't start yet, so join() returns 
immediately, doesn't wait for thread to be actually interrupted
  ...
  eventsManager.postClosed() <- causes the first failure, because 
thread wasn't running and didn't move to Canceled
```
Afterwards, assuming we allowed the transition, the thread will get 
started, and then want to immediately move to Canceled, notice the 
`interrupted` state. Then it would hit the 2nd error, not allowing Canceled 
after Closed.

While we could consider allowing the first transition (Started -> Closed), 
we don't want any events to be coming after Closed, so that listeners can clean 
their state after Closed.

Fix is to handle interrupts coming before the thread started, and then 
prevent the thread from even starting if it was interruped.

### Why are the changes needed?

This was detected after grpc 1.56 to 1.59 upgrade and causes some tests in 
SparkConnectServiceE2ESuite and ReattachableExecuteSuite to be flaky.
With the grpc upgrade, execute is eagerly sent to the server, and in some 
test we cleanup and release the session without waiting for the execution to 
start. This has triggered this flakiness.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Test added. The test depends on timing, so may not fail reliably but only 
from time to time.

### Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

Closes #44095 from juliuszsompolski/SPARK-46186.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
---
 .../connect/execution/ExecuteThreadRunner.scala| 33 ++
 .../spark/sql/connect/service/ExecuteHolder.scala  |  7 -
 .../execution/ReattachableExecuteSuite.scala   | 26 +
 3 files changed, 54 insertions(+), 12 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index 24b3c302b759..0ecdc4bdef96 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -42,6 +42,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
   // forwarding of thread locals needs to be taken into account.
   private val executionThread: Thread = new ExecutionThread()
 
+  private var started: Boolean = false
+
   private var interrupted: Boolean = false
 
   private var completed: Boolean = false
@@ -49,12 +51,21 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
   private val lock = new Object
 
   /** Launches the execution in a background thread, returns immediately. */
-  def start(): Unit = {
-executionThread.start()
+  private[connect] def start(): Unit = {
+lock.synchronized {
+  assert(!started)
+  // Do not start if already interrupted.
+  if (!interrupted) {
+executionThread.start()
+started = true
+