(spark) branch master updated: [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1d80d80a56c4 [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations 1d80d80a56c4 is described below commit 1d80d80a56c418f841e282ad753fad6671c3baae Author: Anish Shrigondekar AuthorDate: Tue Dec 5 15:00:08 2023 +0900 [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations ### What changes were proposed in this pull request? Require instance lock for acquiring RocksDB metrics to prevent race with background operations ### Why are the changes needed? The changes are needed to avoid races where the statefulOperator tries to set storeMetrics after the commit and the DB instance has already been closed/aborted/reloaded. We have seen a few query failures with the following stack trace due to this reason: ``` org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 3 in stage 531.0 failed 1 times, most recent failure: Lost task 3.0 in stage 531.0 (TID 1544) (ip-10-110-29-251.us-west-2.compute.internal executor driver): java.lang.NullPointerException at org.apache.spark.sql.execution.streaming.state.RocksDB.getDBProperty(RocksDB.scala:838) at org.apache.spark.sql.execution.streaming.state.RocksDB.metrics(RocksDB.scala:678) at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.metrics(RocksDBStateStoreProvider.scala:137) at org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics(statefulOperators.scala:198) at org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics$(statefulOperators.scala:197) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.setStoreMetrics(statefulOperators.scala:495) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.close(statefulOperators.scala:626) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modified existing unit tests ``` [info] Run completed in 1 minute, 31 seconds. [info] Total number of tests run: 150 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 150, failed 0, canceled 0, ignored 0, pending 0 [info] All
(spark) branch master updated: [SPARK-46255][PYTHON][CONNECT] Support complex type -> string conversion
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3c8c7ad92cdc [SPARK-46255][PYTHON][CONNECT] Support complex type -> string conversion 3c8c7ad92cdc is described below commit 3c8c7ad92cdc3ae989456d87b0332cb917da7e4e Author: Ruifeng Zheng AuthorDate: Tue Dec 5 13:05:36 2023 +0800 [SPARK-46255][PYTHON][CONNECT] Support complex type -> string conversion ### What changes were proposed in this pull request? Support complex type -> string conversion ### Why are the changes needed? to support `list -> str` conversion ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44171 from zhengruifeng/py_connect_str_conv. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/conversion.py | 16 python/pyspark/sql/tests/test_types.py | 13 + 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/connect/conversion.py b/python/pyspark/sql/connect/conversion.py index 550978d02f85..fb5a2d4b17b1 100644 --- a/python/pyspark/sql/connect/conversion.py +++ b/python/pyspark/sql/connect/conversion.py @@ -222,22 +222,6 @@ class LocalDataToArrowConversion: if value is None: return None else: -# only atomic types are supported -assert isinstance( -value, -( -bool, -int, -float, -str, -bytes, -bytearray, -decimal.Decimal, -datetime.date, -datetime.datetime, -datetime.timedelta, -), -) if isinstance(value, bool): # To match the PySpark which convert bool to string in # the JVM side (python.EvaluatePython.makeFromJava) diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index b038cf6ce5ba..a07309d1dff9 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -507,6 +507,19 @@ class TypesTestsMixin: self.assertEqual(1, row.asDict()["l"][0].a) self.assertEqual(1.0, row.asDict()["d"]["key"].c) +def test_convert_list_to_str(self): +data = [[[123], 120]] +schema = StructType( +[ +StructField("name", StringType(), True), +StructField("income", LongType(), True), +] +) +df = self.spark.createDataFrame(data, schema) +self.assertEqual(df.schema, schema) +self.assertEqual(df.count(), 1) +self.assertEqual(df.head(), Row(name="[123]", income=120)) + def test_udt(self): from pyspark.sql.types import _parse_datatype_json_string, _infer_type, _make_type_verifier - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46254][PYTHON] Remove stale Python 3.8/3.7 version checking
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f3e113b687be [SPARK-46254][PYTHON] Remove stale Python 3.8/3.7 version checking f3e113b687be is described below commit f3e113b687be328ba0d318ec03977aba3036b86f Author: Hyukjin Kwon AuthorDate: Tue Dec 5 11:44:37 2023 +0900 [SPARK-46254][PYTHON] Remove stale Python 3.8/3.7 version checking ### What changes were proposed in this pull request? This PR proposes to remove stale Python 3.8/3.7 version checking in the codebase. ### Why are the changes needed? To remove unnecessary version comparison ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested the version comparison. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44169 from HyukjinKwon/remove-python38. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/__init__.py | 21 ++--- python/pyspark/pandas/frame.py | 4 +--- .../pandas/tests/computation/test_apply_func.py | 5 +++-- python/pyspark/pandas/tests/test_typedef.py | 6 -- python/pyspark/pandas/typedef/typehints.py | 7 +-- 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/python/pyspark/pandas/__init__.py b/python/pyspark/pandas/__init__.py index 44365c8e6b88..65366f544092 100644 --- a/python/pyspark/pandas/__init__.py +++ b/python/pyspark/pandas/__init__.py @@ -122,17 +122,16 @@ def _auto_patch_pandas() -> None: _frame_has_class_getitem = hasattr(pd.DataFrame, "__class_getitem__") _series_has_class_getitem = hasattr(pd.Series, "__class_getitem__") -if sys.version_info >= (3, 7): -# Just in case pandas implements '__class_getitem__' later. -if not _frame_has_class_getitem: -pd.DataFrame.__class_getitem__ = ( # type: ignore[attr-defined] -lambda params: DataFrame.__class_getitem__(params) -) - -if not _series_has_class_getitem: -pd.Series.__class_getitem__ = ( # type: ignore[attr-defined] -lambda params: Series.__class_getitem__(params) -) +# Just in case pandas implements '__class_getitem__' later. +if not _frame_has_class_getitem: +pd.DataFrame.__class_getitem__ = ( # type: ignore[attr-defined] +lambda params: DataFrame.__class_getitem__(params) +) + +if not _series_has_class_getitem: +pd.Series.__class_getitem__ = ( # type: ignore[attr-defined] +lambda params: Series.__class_getitem__(params) +) _auto_patch_spark() diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index a54316dffeb4..9846dc0ae10b 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -2103,9 +2103,7 @@ class DataFrame(Frame, Generic[T]): v = [row[c] for c in data_spark_column_names] return k, v -can_return_named_tuples = sys.version_info >= (3, 7) or len(self.columns) + index < 255 - -if name is not None and can_return_named_tuples: +if name is not None: itertuple = namedtuple(name, fields, rename=True) # type: ignore[misc] for k, v in map( extract_kv_from_spark_row, diff --git a/python/pyspark/pandas/tests/computation/test_apply_func.py b/python/pyspark/pandas/tests/computation/test_apply_func.py index 93d9d56a479a..00b14441991a 100644 --- a/python/pyspark/pandas/tests/computation/test_apply_func.py +++ b/python/pyspark/pandas/tests/computation/test_apply_func.py @@ -23,6 +23,7 @@ import numpy as np import pandas as pd from pyspark import pandas as ps +from pyspark.loose_version import LooseVersion from pyspark.pandas.config import option_context from pyspark.testing.pandasutils import ComparisonTestBase from pyspark.testing.sqlutils import SQLTestUtils @@ -252,8 +253,8 @@ class FrameApplyFunctionMixin: actual.columns = ["a", "b"] self.assert_eq(actual, pdf) -# For NumPy typing, NumPy version should be 1.21+ and Python version should be 3.8+ -if sys.version_info >= (3, 8): +# For NumPy typing, NumPy version should be 1.21+ +if LooseVersion(np.__version__) >= LooseVersion("1.21"): import numpy.typing as ntp psdf = ps.from_pandas(pdf) diff --git a/python/pyspark/pandas/tests/test_typedef.py b/python/pyspark/pandas/tests/test_typedef.py index 52913fb65f09..e8095ce4ba06 100644 --- a/python/pyspark/pandas/tests/test_typedef.py +++ b/python/pyspark/pandas/tests/test_typedef.py @@ -25,6 +25,8 @@ import pandas import
(spark) branch master updated: [SPARK-46043][SQL] Support create table using DSv2 sources
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5fec76dc8db2 [SPARK-46043][SQL] Support create table using DSv2 sources 5fec76dc8db2 is described below commit 5fec76dc8db2499b0a9d76231f9a250871d59658 Author: allisonwang-db AuthorDate: Tue Dec 5 09:35:14 2023 +0800 [SPARK-46043][SQL] Support create table using DSv2 sources ### What changes were proposed in this pull request? This PR supports `CREATE TABLE ... USING source` for DSv2 sources. ### Why are the changes needed? To support creating DSv2 tables in SQL. Currently the table create can work but when you select a dsv2 table created in SQL, it fails with this error: ``` org.apache.spark.sql.AnalysisException: org.apache.spark.sql.connector.SimpleDataSourceV2 is not a valid Spark SQL Data Source. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43949 from allisonwang-db/spark-46043-dsv2-create-table. Authored-by: allisonwang-db Signed-off-by: Wenchen Fan --- .../src/main/resources/error/error-classes.json| 13 +++ ...-cannot-create-data-source-table-error-class.md | 32 ++ docs/sql-error-conditions.md | 8 ++ .../spark/sql/catalyst/util/QuotingUtils.scala | 8 ++ .../sql/connector/catalog/CatalogV2Implicits.scala | 6 ++ .../datasources/v2/DataSourceV2Relation.scala | 19 +++- .../catalyst/analysis/ResolveSessionCatalog.scala | 14 +-- .../datasources/v2/DataSourceV2Utils.scala | 16 +++ .../datasources/v2/V2SessionCatalog.scala | 104 +--- .../connector/JavaSchemaRequiredDataSource.java| 4 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 15 +-- .../spark/sql/connector/DataSourceV2Suite.scala| 108 - .../spark/sql/connector/FakeV2Provider.scala | 63 .../spark/sql/connector/InsertIntoTests.scala | 12 ++- .../sql/connector/TestV2SessionCatalogBase.scala | 5 +- .../spark/sql/connector/V1WriteFallbackSuite.scala | 3 +- .../execution/command/PlanResolutionSuite.scala| 2 +- 17 files changed, 384 insertions(+), 48 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index a808be9510cf..e54d346e1bc1 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -149,6 +149,19 @@ ], "sqlState" : "42846" }, + "CANNOT_CREATE_DATA_SOURCE_TABLE" : { +"message" : [ + "Failed to create data source table :" +], +"subClass" : { + "EXTERNAL_METADATA_UNSUPPORTED" : { +"message" : [ + "provider '' does not support external metadata but a schema is provided. Please remove the schema when creating the table." +] + } +}, +"sqlState" : "42KDE" + }, "CANNOT_DECODE_URL" : { "message" : [ "The provided URL cannot be decoded: . Please ensure that the URL is properly formatted and try again." diff --git a/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md new file mode 100644 index ..f52e4f462bc9 --- /dev/null +++ b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md @@ -0,0 +1,32 @@ +--- +layout: global +title: CANNOT_CREATE_DATA_SOURCE_TABLE error class +displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to create data source table ``: + +This error class has the following derived error classes: + +## EXTERNAL_METADATA_UNSUPPORTED + +provider '``'
(spark) branch master updated: Revert "[SPARK-46213][PYTHON] Introduce `PySparkImportError` for error framework"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7f59565b9fc1 Revert "[SPARK-46213][PYTHON] Introduce `PySparkImportError` for error framework" 7f59565b9fc1 is described below commit 7f59565b9fc19c496bc7600e168650e7663c0065 Author: Hyukjin Kwon AuthorDate: Tue Dec 5 10:29:25 2023 +0900 Revert "[SPARK-46213][PYTHON] Introduce `PySparkImportError` for error framework" This reverts commit 75b0eb2d601763847507a5e715b3732db004544a. --- python/docs/source/reference/pyspark.errors.rst | 1 - python/pyspark/errors/__init__.py | 2 -- python/pyspark/errors/error_classes.py | 10 --- python/pyspark/errors/exceptions/base.py| 6 python/pyspark/sql/connect/utils.py | 36 +++ python/pyspark/sql/pandas/utils.py | 38 +++-- 6 files changed, 21 insertions(+), 72 deletions(-) diff --git a/python/docs/source/reference/pyspark.errors.rst b/python/docs/source/reference/pyspark.errors.rst index a4997506b41e..56fdde2584c5 100644 --- a/python/docs/source/reference/pyspark.errors.rst +++ b/python/docs/source/reference/pyspark.errors.rst @@ -44,7 +44,6 @@ Classes PySparkRuntimeError PySparkTypeError PySparkValueError -PySparkImportError PySparkIndexError PythonException QueryExecutionException diff --git a/python/pyspark/errors/__init__.py b/python/pyspark/errors/__init__.py index 07033d216432..0a55084a4a59 100644 --- a/python/pyspark/errors/__init__.py +++ b/python/pyspark/errors/__init__.py @@ -39,7 +39,6 @@ from pyspark.errors.exceptions.base import ( # noqa: F401 SparkNoSuchElementException, PySparkTypeError, PySparkValueError, -PySparkImportError, PySparkIndexError, PySparkAttributeError, PySparkRuntimeError, @@ -71,7 +70,6 @@ __all__ = [ "SparkNoSuchElementException", "PySparkTypeError", "PySparkValueError", -"PySparkImportError", "PySparkIndexError", "PySparkAttributeError", "PySparkRuntimeError", diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index 8ca73ca85de6..e0ca8a938ec2 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -682,11 +682,6 @@ ERROR_CLASSES_JSON = """ "Only a single trigger is allowed." ] }, - "PACKAGE_NOT_INSTALLED" : { -"message" : [ - " >= must be installed; however, it was not found." -] - }, "PIPE_FUNCTION_EXITED" : { "message" : [ "Pipe function `` exited with error code ." @@ -943,11 +938,6 @@ ERROR_CLASSES_JSON = """ " is not supported." ] }, - "UNSUPPORTED_PACKAGE_VERSION" : { -"message" : [ - " >= must be installed; however, your version is ." -] - }, "UNSUPPORTED_PARAM_TYPE_FOR_HIGHER_ORDER_FUNCTION" : { "message" : [ "Function `` should use only POSITIONAL or POSITIONAL OR KEYWORD arguments." diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index 0f4001483b7f..e7f1e4386d7a 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -260,12 +260,6 @@ class PySparkPicklingError(PySparkException, PicklingError): """ -class PySparkImportError(PySparkException, ImportError): -""" -Wrapper class for ImportError to support error classes. -""" - - class PySparkKeyError(PySparkException, KeyError): """ Wrapper class for KeyError to support error classes. diff --git a/python/pyspark/sql/connect/utils.py b/python/pyspark/sql/connect/utils.py index 88f26202b0b2..fd85d75060b5 100644 --- a/python/pyspark/sql/connect/utils.py +++ b/python/pyspark/sql/connect/utils.py @@ -18,7 +18,6 @@ import sys from pyspark.loose_version import LooseVersion from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version -from pyspark.errors import PySparkImportError def check_dependencies(mod_name: str) -> None: @@ -46,21 +45,13 @@ def require_minimum_grpc_version() -> None: try: import grpc except ImportError as error: -raise PySparkImportError( -error_class="PACKAGE_NOT_INSTALLED", -message_parameters={ -"package_name:": "grpcio", -"minimum_version": str(minimum_grpc_version), -}, +raise ImportError( +f"grpcio >= {minimum_grpc_version} must be installed; however, it was not found." ) from error if LooseVersion(grpc.__version__) < LooseVersion(minimum_grpc_version): -raise PySparkImportError( -error_class="UNSUPPORTED_PACKAGE_VERSION", -message_parameters={ -
(spark) branch master updated: [SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and PercentileDisc into functionCall
This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f1283c126878 [SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and PercentileDisc into functionCall f1283c126878 is described below commit f1283c12687853f9cd190f8db69d97abe16a2d88 Author: Jiaan Geng AuthorDate: Tue Dec 5 09:28:30 2023 +0800 [SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and PercentileDisc into functionCall ### What changes were proposed in this pull request? Spark SQL parser have a special rule to parse `[percentile_cont|percentile_disc](percentage) WITHIN GROUP (ORDER BY v)`. We should merge this rule into the `functionCall`. ### Why are the changes needed? Merge the parse rule of `PercentileCont` and `PercentileDisc` into `functionCall`. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? New test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43910 from beliefer/SPARK-46009. Authored-by: Jiaan Geng Signed-off-by: Jiaan Geng --- common/utils/src/main/resources/error/README.md| 1 + .../src/main/resources/error/error-classes.json| 23 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 7 +- ...id-inverse-distribution-function-error-class.md | 40 +++ docs/sql-error-conditions.md | 8 + .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 4 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 29 +- .../sql/catalyst/analysis/FunctionRegistry.scala | 2 + .../spark/sql/catalyst/analysis/unresolved.scala | 32 ++- .../aggregate/SupportsOrderingWithinGroup.scala| 27 ++ .../expressions/aggregate/percentiles.scala| 81 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 34 +-- .../spark/sql/errors/QueryCompilationErrors.scala | 24 ++ .../sql/catalyst/parser/PlanParserSuite.scala | 49 +++- .../sql-functions/sql-expression-schema.md | 2 + .../sql-tests/analyzer-results/percentiles.sql.out | 275 +++ .../resources/sql-tests/inputs/percentiles.sql | 48 .../sql-tests/results/percentiles.sql.out | 299 + 18 files changed, 922 insertions(+), 63 deletions(-) diff --git a/common/utils/src/main/resources/error/README.md b/common/utils/src/main/resources/error/README.md index c9fdd84e7442..556a634e9927 100644 --- a/common/utils/src/main/resources/error/README.md +++ b/common/utils/src/main/resources/error/README.md @@ -1309,6 +1309,7 @@ The following SQLSTATEs are collated from: |HZ320|HZ |RDA-specific condition|320 |version not supported |RDA/SQL|Y |RDA/SQL | |HZ321|HZ |RDA-specific condition|321 |TCP/IP error|RDA/SQL|Y |RDA/SQL | |HZ322|HZ |RDA-specific condition|322 |TLS alert |RDA/SQL|Y |RDA/SQL | +|ID001|IM |Invalid inverse distribution function |001 |Invalid inverse distribution function |SQL/Foundation |N |SQL/Foundation PostgreSQL Oracle Snowflake Redshift H2 | |IM001|IM |ODBC driver |001 |Driver does not support this function |SQL Server |N |SQL Server | |IM002|IM |ODBC driver |002 |Data source name not found and no default driver specified |SQL Server |N |SQL Server | |IM003|IM |ODBC driver |003 |Specified driver could not be loaded|SQL Server |N |SQL Server | diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 6795ebcb0bd0..a808be9510cf 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1858,6 +1858,29 @@ }, "sqlState" : "42000" }, + "INVALID_INVERSE_DISTRIBUTION_FUNCTION" : { +"message" : [
(spark) branch master updated: [SPARK-46229][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary Python version check lower than 3.8
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7af6a666859d [SPARK-46229][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary Python version check lower than 3.8 7af6a666859d is described below commit 7af6a666859d2f614e9937375ea518f254bbad06 Author: Hyukjin Kwon AuthorDate: Tue Dec 5 09:54:37 2023 +0900 [SPARK-46229][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary Python version check lower than 3.8 ### What changes were proposed in this pull request? This PR addresses the review comment https://github.com/apache/spark/pull/44146#discussion_r1414336068. ### Why are the changes needed? We don't support Python lower than 3.8 so we can remove that if-else on Python version. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests via CI. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44168 from HyukjinKwon/SPARK-46229-followup. Lead-authored-by: Hyukjin Kwon Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/_typing.py | 10 +- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/python/pyspark/sql/connect/_typing.py b/python/pyspark/sql/connect/_typing.py index 392c62bf50d3..1b8516427dbd 100644 --- a/python/pyspark/sql/connect/_typing.py +++ b/python/pyspark/sql/connect/_typing.py @@ -14,16 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import sys - -if sys.version_info >= (3, 8): -from typing import Protocol, Tuple -else: -from typing_extensions import Protocol - -from typing import Tuple from types import FunctionType -from typing import Any, Callable, Iterable, Union, Optional, NewType +from typing import Any, Callable, Iterable, Union, Optional, NewType, Protocol, Tuple import datetime import decimal - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 91a02e5d9701 [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions 91a02e5d9701 is described below commit 91a02e5d97011f5f9b620c07b1c2f7d85291448b Author: Daniel Tenedorio AuthorDate: Mon Dec 4 16:51:56 2023 -0800 [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions ### What changes were proposed in this pull request? This PR updates the Python user-defined table function (UDTF) API for the `analyze` method to support general expressions for the `partitionBy` and `orderBy` fields of the `AnalyzeResult` class. For example, the following UDTF specifies to partition by `partition_col / 10` so that all rows with values of this column between 0-9 arrive in the same partition, then all rows with values between 10-19 in the next partition, and so on. ``` udtf class TestUDTF: def __init__(self): self._partition_col = None self._count = 0 self._sum = 0 self._last = None staticmethod def analyze(*args, **kwargs): return AnalyzeResult( schema=StructType() .add("partition_col", IntegerType()) .add("count", IntegerType()) .add("total", IntegerType()) .add("last", IntegerType()), partitionBy=[PartitioningColumn("partition_col / 10")], orderBy=[ OrderingColumn(name="input", ascending=True, overrideNullsFirst=False) ], ) def eval(self, row: Row): self._partition_col = row["partition_col"] self._count += 1 self._last = row["input"] if row["input"] is not None: self._sum += row["input"] def terminate(self): yield self._partition_col, self._count, self._sum, self._last ``` ### Why are the changes needed? This lets the UDTF partition by simple references to the columns of the input table just like before, but also gives the option to partition by general expressions based on those columns (just like the explicit `PARTITION BY` and `ORDER BY` clauses in the UDTF call in SQL). ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR includes test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43946 from dtenedor/udtf-order-partition-by-exprs. Authored-by: Daniel Tenedorio Signed-off-by: Takuya UESHIN --- .../src/main/resources/error/error-classes.json| 6 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 4 +- docs/sql-error-conditions.md | 6 +++ python/docs/source/user_guide/sql/python_udtf.rst | 2 +- python/pyspark/sql/udtf.py | 21 + .../spark/sql/errors/QueryCompilationErrors.scala | 7 +++ .../org/apache/spark/sql/UDTFRegistration.scala| 4 +- .../python/UserDefinedPythonFunction.scala | 53 - .../sql-tests/analyzer-results/udtf/udtf.sql.out | 47 +++ .../test/resources/sql-tests/inputs/udtf/udtf.sql | 3 ++ .../resources/sql-tests/results/udtf/udtf.sql.out | 54 ++ .../apache/spark/sql/IntegratedUDFTestUtils.scala | 39 +--- 12 files changed, 206 insertions(+), 40 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 9e0019b34728..6795ebcb0bd0 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3110,6 +3110,12 @@ ], "sqlState" : "42802" }, + "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD" : { +"message" : [ + "Failed to evaluate the user-defined table function because its 'analyze' method returned a requested OrderingColumn whose column name expression included an unnecessary alias ; please remove this alias and then try the query again." +], +"sqlState" : "42802" + }, "UNABLE_TO_ACQUIRE_MEMORY" : { "message" : [ "Unable to acquire bytes of memory, got ." diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b64fecafa311..dc1730c78267 100644 ---
(spark) branch master updated: [SPARK-46233][PYTHON] Migrate all remaining `AttributeError` into PySpark error framework
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be49ca6dd71b [SPARK-46233][PYTHON] Migrate all remaining `AttributeError` into PySpark error framework be49ca6dd71b is described below commit be49ca6dd71b87172df9d88f305f06a7b87c9ecf Author: Haejoon Lee AuthorDate: Mon Dec 4 16:18:27 2023 -0800 [SPARK-46233][PYTHON] Migrate all remaining `AttributeError` into PySpark error framework ### What changes were proposed in this pull request? This PR proposes to migrate all remaining `AttributeError` from `pyspark/sql/*` into PySpark error framework, `PySparkAttributeError` with assigning dedicated error classes. ### Why are the changes needed? To improve the error handling in PySpark. ### Does this PR introduce _any_ user-facing change? No API changes, but the user-facing error messages will be improved. ### How was this patch tested? The existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44150 from itholic/migrate_attribute_error. Authored-by: Haejoon Lee Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/connect/dataframe.py | 10 +++--- python/pyspark/sql/dataframe.py | 11 --- python/pyspark/sql/types.py | 13 ++--- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index a73a24818c0c..6a1d45712163 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -14,7 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from pyspark.errors.exceptions.base import SessionNotSameException, PySparkIndexError +from pyspark.errors.exceptions.base import ( +SessionNotSameException, +PySparkIndexError, +PySparkAttributeError, +) from pyspark.sql.connect.utils import check_dependencies check_dependencies(__name__) @@ -1694,8 +1698,8 @@ class DataFrame: ) if name not in self.columns: -raise AttributeError( -"'%s' object has no attribute '%s'" % (self.__class__.__name__, name) +raise PySparkAttributeError( +error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name} ) return _to_col_with_plan_id( diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8b40b222a289..5211d874ba33 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -43,7 +43,12 @@ from py4j.java_gateway import JavaObject, JVMView from pyspark import copy_func, _NoValue from pyspark._globals import _NoValueType from pyspark.context import SparkContext -from pyspark.errors import PySparkTypeError, PySparkValueError, PySparkIndexError +from pyspark.errors import ( +PySparkTypeError, +PySparkValueError, +PySparkIndexError, +PySparkAttributeError, +) from pyspark.rdd import ( RDD, _load_from_socket, @@ -3613,8 +3618,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): +---+ """ if name not in self.columns: -raise AttributeError( -"'%s' object has no attribute '%s'" % (self.__class__.__name__, name) +raise PySparkAttributeError( +error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name} ) jc = self._jdf.apply(name) return Column(jc) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index cbfc4ab5df02..d3eed77b3838 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -55,6 +55,7 @@ from pyspark.errors import ( PySparkTypeError, PySparkValueError, PySparkIndexError, +PySparkAttributeError, PySparkKeyError, ) @@ -2574,16 +2575,22 @@ class Row(tuple): def __getattr__(self, item: str) -> Any: if item.startswith("__"): -raise AttributeError(item) +raise PySparkAttributeError( +error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": item} +) try: # it will be slow when it has many fields, # but this will not be used in normal cases idx = self.__fields__.index(item) return self[idx] except IndexError: -raise AttributeError(item) +raise PySparkAttributeError( +error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": item} +) except ValueError: -raise AttributeError(item)
(spark) branch master updated: [SPARK-45684][SQL][SS][TESTS][FOLLOWUP] Use `++` instead of `s.c.SeqOps#concat`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 19e68ba34098 [SPARK-45684][SQL][SS][TESTS][FOLLOWUP] Use `++` instead of `s.c.SeqOps#concat` 19e68ba34098 is described below commit 19e68ba34098baeef617b31c227a14efc275c46d Author: yangjie01 AuthorDate: Tue Dec 5 08:50:29 2023 +0900 [SPARK-45684][SQL][SS][TESTS][FOLLOWUP] Use `++` instead of `s.c.SeqOps#concat` ### What changes were proposed in this pull request? This pr use `++` instead of `s.c.SeqOps#concat` to address comments: https://github.com/apache/spark/pull/43575#discussion_r1414163363 ### Why are the changes needed? `++` is alias for `concat`, but the readability of ++ is better. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44161 from LuciferYang/SPARK-45684-FOLLOWUP. Authored-by: yangjie01 Signed-off-by: Hyukjin Kwon --- .../src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 883f64ff7af4..cd0bbfd47b2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -174,7 +174,7 @@ class StreamSuite extends StreamTest { try { query.processAllAvailable() val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] - checkDatasetUnorderly[Long](outputDf, (0L to 10L).concat(0L to 10L): _*) + checkDatasetUnorderly[Long](outputDf, (0L to 10L) ++ (0L to 10L): _*) } finally { query.stop() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (d314e4bba54a -> d5f2539f83f6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d314e4bba54a [SPARK-46250][CONNECT][SS] Deflake test_parity_listener add d5f2539f83f6 [SPARK-45940][FOLLOWUP][TESTS] Only test Python data source when Python and PySpark environments are available No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala | 3 +++ 1 file changed, 3 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46250][CONNECT][SS] Deflake test_parity_listener
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d314e4bba54a [SPARK-46250][CONNECT][SS] Deflake test_parity_listener d314e4bba54a is described below commit d314e4bba54a08babad8f8ecd564e7440c0e97f4 Author: Wei Liu AuthorDate: Tue Dec 5 08:48:23 2023 +0900 [SPARK-46250][CONNECT][SS] Deflake test_parity_listener ### What changes were proposed in this pull request? We didn’t give the onQueryTerminated handler enough time to write the data to that table, and hence it's possible that we read from a not existing table `listener_terminated_event`. ### Why are the changes needed? Deflake existing tests ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only change ### Was this patch authored or co-authored using generative AI tooling? Closes #44166 from WweiL/SPARK-46250-deflaky-parity-listener. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/connect/streaming/test_parity_listener.py | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py index ca02cf29ee7d..4fc040642bed 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py @@ -68,11 +68,14 @@ class StreamingListenerParityTests(StreamingListenerTestsMixin, ReusedConnectTes ) self.assertTrue(q.isActive) -time.sleep(10) -self.assertTrue(q.lastProgress["batchId"] > 0) # ensure at least one batch is ran +# ensure at least one batch is ran +while q.lastProgress is None or q.lastProgress["batchId"] == 0: +time.sleep(5) q.stop() self.assertFalse(q.isActive) +time.sleep(60) # Sleep to make sure listener_terminated_events is written successfully + start_event = pyspark.cloudpickle.loads( self.spark.read.table("listener_start_events").collect()[0][0] ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46243][SQL][DOCS] Describe arguments of `decode()`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 47e8e02141d4 [SPARK-46243][SQL][DOCS] Describe arguments of `decode()` 47e8e02141d4 is described below commit 47e8e02141d4f56f5cb92ec4762c9ea5c9b91e90 Author: Max Gekk AuthorDate: Tue Dec 5 08:31:49 2023 +0900 [SPARK-46243][SQL][DOCS] Describe arguments of `decode()` ### What changes were proposed in this pull request? In the PR, I propose to update the description of the `StringDecode` expression and apparently the `decode()` function by describing the arguments `bin` and `charset`. The updated docs: https://github.com/apache/spark/assets/1580697/a177f81d-2f39-45ff-bc28-b217dad4e128;> ### Why are the changes needed? To improve user experience with Spark SQL by documenting the public function. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By manually checking the generated docs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44157 from MaxGekk/doc-decode-params. Authored-by: Max Gekk Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/expressions/stringExpressions.scala | 16 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 7c5d65d2b958..259556826ad9 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -2594,6 +2594,11 @@ object Decode { the corresponding result. If no match is found, then it returns default. If default is omitted, it returns null. """, + arguments = """ +Arguments: + * bin - a binary expression to decode + * charset - one of the charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' to decode `bin` into a STRING. It is case insensitive. + """, examples = """ Examples: > SELECT _FUNC_(encode('abc', 'utf-8'), 'utf-8'); @@ -2623,18 +2628,21 @@ case class Decode(params: Seq[Expression], replacement: Expression) } /** - * Decodes the first argument into a String using the provided character set - * (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). - * If either argument is null, the result will also be null. + * Decodes the first argument into a String using the provided character set. */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(bin, charset) - Decodes the first argument using the second argument character set.", + usage = "_FUNC_(bin, charset) - Decodes the first argument using the second argument character set. If either argument is null, the result will also be null.", examples = """ Examples: > SELECT _FUNC_(encode('abc', 'utf-8'), 'utf-8'); abc """, + arguments = """ +Arguments: + * bin - a binary expression to decode + * charset - one of the charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' to decode `bin` into a STRING. It is case insensitive. + """, since = "1.5.0", group = "string_funcs") // scalastyle:on line.size.limit - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.3 updated: [SPARK-46239][CORE] Hide `Jetty` info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new aaec17fb244c [SPARK-46239][CORE] Hide `Jetty` info aaec17fb244c is described below commit aaec17fb244c175068f4de52e1288acc6125c5e9 Author: Dongjoon Hyun AuthorDate: Mon Dec 4 14:41:27 2023 -0800 [SPARK-46239][CORE] Hide `Jetty` info **What changes were proposed in this pull request?** The PR sets parameters to hide the version of jetty in spark. **Why are the changes needed?** It can avoid obtaining remote WWW service information through HTTP. **Does this PR introduce any user-facing change?** No **How was this patch tested?** Manual review **Was this patch authored or co-authored using generative AI tooling?** No Closes #44158 from chenyu-opensource/branch-SPARK-46239. Lead-authored-by: Dongjoon Hyun Co-authored-by: chenyu <119398199+chenyu-opensou...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun (cherry picked from commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6) Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 834e4dfc4841..44bbd95fad13 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -311,6 +311,12 @@ private[spark] object JettyUtils extends Logging { logDebug(s"Using requestHeaderSize: $requestHeaderSize") httpConfig.setRequestHeaderSize(requestHeaderSize) + // Hide information. + logDebug("Using setSendServerVersion: false") + httpConfig.setSendServerVersion(false) + logDebug("Using setSendXPoweredBy: false") + httpConfig.setSendXPoweredBy(false) + // If SSL is configured, create the secure connector first. val securePort = sslOptions.createJettySslContextFactory().map { factory => val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.4 updated: [SPARK-46239][CORE] Hide `Jetty` info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 757c3a9d62b7 [SPARK-46239][CORE] Hide `Jetty` info 757c3a9d62b7 is described below commit 757c3a9d62b7519f5bdc50d09e472b0490b6bae8 Author: Dongjoon Hyun AuthorDate: Mon Dec 4 14:41:27 2023 -0800 [SPARK-46239][CORE] Hide `Jetty` info **What changes were proposed in this pull request?** The PR sets parameters to hide the version of jetty in spark. **Why are the changes needed?** It can avoid obtaining remote WWW service information through HTTP. **Does this PR introduce any user-facing change?** No **How was this patch tested?** Manual review **Was this patch authored or co-authored using generative AI tooling?** No Closes #44158 from chenyu-opensource/branch-SPARK-46239. Lead-authored-by: Dongjoon Hyun Co-authored-by: chenyu <119398199+chenyu-opensou...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun (cherry picked from commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6) Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index d8119fb94984..2407152a5498 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -312,6 +312,12 @@ private[spark] object JettyUtils extends Logging { logDebug(s"Using requestHeaderSize: $requestHeaderSize") httpConfig.setRequestHeaderSize(requestHeaderSize) + // Hide information. + logDebug("Using setSendServerVersion: false") + httpConfig.setSendServerVersion(false) + logDebug("Using setSendXPoweredBy: false") + httpConfig.setSendXPoweredBy(false) + // If SSL is configured, create the secure connector first. val securePort = sslOptions.createJettySslContextFactory().map { factory => val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-46239][CORE] Hide `Jetty` info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 1321b4e64dea [SPARK-46239][CORE] Hide `Jetty` info 1321b4e64dea is described below commit 1321b4e64deaa1e58bf297c25b72319083056568 Author: Dongjoon Hyun AuthorDate: Mon Dec 4 14:41:27 2023 -0800 [SPARK-46239][CORE] Hide `Jetty` info **What changes were proposed in this pull request?** The PR sets parameters to hide the version of jetty in spark. **Why are the changes needed?** It can avoid obtaining remote WWW service information through HTTP. **Does this PR introduce any user-facing change?** No **How was this patch tested?** Manual review **Was this patch authored or co-authored using generative AI tooling?** No Closes #44158 from chenyu-opensource/branch-SPARK-46239. Lead-authored-by: Dongjoon Hyun Co-authored-by: chenyu <119398199+chenyu-opensou...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun (cherry picked from commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6) Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 9582bdbf5264..21753361e627 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -312,6 +312,12 @@ private[spark] object JettyUtils extends Logging { logDebug(s"Using requestHeaderSize: $requestHeaderSize") httpConfig.setRequestHeaderSize(requestHeaderSize) + // Hide information. + logDebug("Using setSendServerVersion: false") + httpConfig.setSendServerVersion(false) + logDebug("Using setSendXPoweredBy: false") + httpConfig.setSendXPoweredBy(false) + // If SSL is configured, create the secure connector first. val securePort = sslOptions.createJettySslContextFactory().map { factory => val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46239][CORE] Hide `Jetty` info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ff4f59341215 [SPARK-46239][CORE] Hide `Jetty` info ff4f59341215 is described below commit ff4f59341215b7f3a87e6cd8798d49e25562fcd6 Author: Dongjoon Hyun AuthorDate: Mon Dec 4 14:41:27 2023 -0800 [SPARK-46239][CORE] Hide `Jetty` info **What changes were proposed in this pull request?** The PR sets parameters to hide the version of jetty in spark. **Why are the changes needed?** It can avoid obtaining remote WWW service information through HTTP. **Does this PR introduce any user-facing change?** No **How was this patch tested?** Manual review **Was this patch authored or co-authored using generative AI tooling?** No Closes #44158 from chenyu-opensource/branch-SPARK-46239. Lead-authored-by: Dongjoon Hyun Co-authored-by: chenyu <119398199+chenyu-opensou...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 6 ++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 22adcbc32ed8..50251975d733 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -314,6 +314,12 @@ private[spark] object JettyUtils extends Logging { logDebug(s"Using requestHeaderSize: $requestHeaderSize") httpConfig.setRequestHeaderSize(requestHeaderSize) + // Hide information. + logDebug("Using setSendServerVersion: false") + httpConfig.setSendServerVersion(false) + logDebug("Using setSendXPoweredBy: false") + httpConfig.setSendXPoweredBy(false) + // If SSL is configured, create the secure connector first. val securePort = sslOptions.createJettySslContextFactory().map { factory => val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.3 updated: [SPARK-46092][SQL][3.3] Don't push down Parquet row group filters that overflow
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new c9412307394f [SPARK-46092][SQL][3.3] Don't push down Parquet row group filters that overflow c9412307394f is described below commit c9412307394fd1a277dd7fd5b173ec34e4b123d6 Author: Johan Lasperas AuthorDate: Mon Dec 4 12:50:57 2023 -0800 [SPARK-46092][SQL][3.3] Don't push down Parquet row group filters that overflow This is a cherry-pick from https://github.com/apache/spark/pull/44006 to spark 3.3 ### What changes were proposed in this pull request? This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")` While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing. ### Why are the changes needed? Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` will return an empty result. The correct result is either: - Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today) - Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability). ### Does this PR introduce _any_ user-facing change? The following: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader. ### How was this patch tested? - Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type - Added test to `ParquetQuerySuite` covering the correctness issue described above. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44156 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.3. Authored-by: Johan Lasperas Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetFilters.scala | 10 ++- .../datasources/parquet/ParquetFilterSuite.scala | 71 ++ .../datasources/parquet/ParquetQuerySuite.scala| 20 ++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 210f37d473ad..969fbab746ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import java.math.{BigDecimal => JBigDecimal} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.{Date, Timestamp} @@ -600,7 +600,13 @@ class ParquetFilters( value == null || (nameToParquetField(name).fieldType match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetIntegerType if value.isInstanceOf[Period] => true - case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] + case ParquetByteType | ParquetShortType | ParquetIntegerType => value match { +// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int. +// We don't create a filter if the value would overflow. +case _: JByte | _: JShort | _: Integer => true +case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue +case _ => false + } case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration] case ParquetFloatType => value.isInstanceOf[JFloat] case ParquetDoubleType =>
(spark) branch master updated: [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace `s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ac0bd2eb7b40 [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace `s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter` ac0bd2eb7b40 is described below commit ac0bd2eb7b4089096f9fb288482b2f1b5049b7e2 Author: yangjie01 AuthorDate: Mon Dec 4 12:49:52 2023 -0800 [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace `s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter` ### What changes were proposed in this pull request? This pr uses `s.c.MapOps.filter` to simplify code pattern `s.c.MapOps.view.filterKeys`. ### Why are the changes needed? The coding pattern of `s.c.MapOps.view.filterKeys` seems verbose, it can be simplified using `s.c.MapOps.filter`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44160 from LuciferYang/SPARK-46245. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../spark/sql/kafka010/KafkaContinuousStream.scala | 2 +- .../scala/org/apache/spark/deploy/master/Master.scala | 3 +-- .../spark/deploy/rest/RestSubmissionClient.scala | 4 ++-- .../spark/executor/CoarseGrainedExecutorBackend.scala | 8 .../org/apache/spark/resource/ResourceProfile.scala| 4 ++-- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../cluster/CoarseGrainedSchedulerBackend.scala| 2 +- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 4 ++-- .../spark/storage/ShuffleBlockFetcherIterator.scala| 2 +- .../main/scala/org/apache/spark/ui/PagedTable.scala| 7 +++ .../org/apache/spark/HeartbeatReceiverSuite.scala | 2 +- .../scala/org/apache/spark/SparkThrowableSuite.scala | 5 ++--- .../spark/internal/plugin/PluginContainerSuite.scala | 2 +- .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 6 -- .../apache/spark/deploy/yarn/ExecutorRunnable.scala| 2 +- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../apache/spark/sql/catalyst/catalog/interface.scala | 8 +++- .../catalyst/expressions/codegen/CodeGenerator.scala | 2 +- .../catalyst/plans/logical/basicLogicalOperators.scala | 5 +++-- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../apache/spark/sql/execution/command/tables.scala| 7 --- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../spark/sql/execution/datasources/FileFormat.scala | 2 +- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 3 ++- .../sql/execution/datasources/v2/CacheTableExec.scala | 3 ++- .../execution/datasources/v2/DataSourceV2Utils.scala | 2 +- .../execution/datasources/v2/FileDataSourceV2.scala| 2 +- .../execution/datasources/v2/ShowCreateTableExec.scala | 18 ++ .../execution/datasources/v2/V2SessionCatalog.scala| 4 ++-- .../execution/streaming/state/RocksDBFileManager.scala | 6 +++--- .../apache/spark/sql/execution/ui/ExecutionPage.scala | 4 ++-- .../apache/spark/sql/streaming/DataStreamReader.scala | 2 +- .../apache/spark/sql/streaming/DataStreamWriter.scala | 2 +- .../apache/spark/sql/hive/HiveExternalCatalog.scala| 11 ++- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 7 +++ .../apache/spark/sql/hive/execution/HiveOptions.scala | 6 +++--- .../spark/sql/hive/HiveSchemaInferenceSuite.scala | 2 +- .../org/apache/spark/sql/hive/StatisticsSuite.scala| 10 +- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 8 .../hive/execution/command/ShowCreateTableSuite.scala | 2 +- 40 files changed, 93 insertions(+), 88 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 026c4d560722..a86acd971a1c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -102,7 +102,7 @@ class KafkaContinuousStream( } val startOffsets = newPartitionOffsets ++ - oldStartPartitionOffsets.view.filterKeys(!deletedPartitions.contains(_)) + oldStartPartitionOffsets.filter { case (k, _) => !deletedPartitions.contains(k) } knownPartitions = startOffsets.keySet startOffsets.toSeq.map { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0fe72e28ea5b..2e1d7b9bce33
(spark) branch master updated: [SPARK-32246][BUILD][INFRA] Enable `streaming-kinesis-asl` tests in Github Action CI
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 64ec5f1017e1 [SPARK-32246][BUILD][INFRA] Enable `streaming-kinesis-asl` tests in Github Action CI 64ec5f1017e1 is described below commit 64ec5f1017e1f2ca479060ca76f18b1c4a803b81 Author: Junyu Chen AuthorDate: Mon Dec 4 12:44:11 2023 -0800 [SPARK-32246][BUILD][INFRA] Enable `streaming-kinesis-asl` tests in Github Action CI ### What changes were proposed in this pull request? This PR attempts to set up Kinesis tests in one of the existing Github Actions. Note that currently there are totally 57 tests in the Kinesis-asl module, and this PR enabled 35 of them. The remaining tests requires interaction with Amazon Kinesis service which would incur billing costs to users. Hence they are not included in the Github Action. ### Why are the changes needed? Addressing the comments in this PR: https://github.com/apache/spark/pull/42581#issuecomment-1685925739 which attempts to upgrade the AWS SDK to v2 for Spark Kinesis connector. Since Kinesis tests are not being run in the Github Actions, there is no automated mechanism to verify the SDK v2 upgrade changes in this module. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1. All existing Github Actions passed. 2. All Kinesis tests passed when running locally: `export ENABLE_KINESIS_TESTS=1 && mvn test -Pkinesis-asl -pl connector/kinesis-asl` ``` Tests: succeeded 57, failed 0, canceled 0, ignored 0, pending 0 All tests passed. [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 13:25 min [INFO] Finished at: 2023-11-12T00:15:49+08:00 [INFO] ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43736 from junyuc25/junyuc25/kinesis-test. Authored-by: Junyu Chen Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 2 +- dev/sparktestsupport/modules.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 4612b504ccdf..d58356ec1c5d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -150,7 +150,7 @@ jobs: - >- mllib-local, mllib, graphx - >- -streaming, sql-kafka-0-10, streaming-kafka-0-10, +streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, connect, protobuf # Here, we split Hive and SQL tests into some of slow ones and the rest of them. diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 8995b7de0df9..15b2e8f186e5 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -359,7 +359,7 @@ streaming_kinesis_asl = Module( build_profile_flags=[ "-Pkinesis-asl", ], -environ={"ENABLE_KINESIS_TESTS": "1"}, +environ={"ENABLE_KINESIS_TESTS": "0"}, sbt_test_goals=[ "streaming-kinesis-asl/test", ], - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.4 updated: [SPARK-46092][SQL][3.4] Don't push down Parquet row group filters that overflow
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 05b5c9e2e3df [SPARK-46092][SQL][3.4] Don't push down Parquet row group filters that overflow 05b5c9e2e3df is described below commit 05b5c9e2e3dfb7641e59895afc8ecb0f4f861127 Author: Johan Lasperas AuthorDate: Mon Dec 4 08:59:21 2023 -0800 [SPARK-46092][SQL][3.4] Don't push down Parquet row group filters that overflow This is a cherry-pick from https://github.com/apache/spark/pull/44006 to spark 3.4 ### What changes were proposed in this pull request? This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")` While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing. ### Why are the changes needed? Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` will return an empty result. The correct result is either: - Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today) - Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability). ### Does this PR introduce _any_ user-facing change? The following: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader. ### How was this patch tested? - Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type - Added test to `ParquetQuerySuite` covering the correctness issue described above. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44155 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.4. Authored-by: Johan Lasperas Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetFilters.scala | 10 ++- .../datasources/parquet/ParquetFilterSuite.scala | 71 ++ .../datasources/parquet/ParquetQuerySuite.scala| 20 ++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 6994e1ba39d9..5943dbdfb786 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import java.math.{BigDecimal => JBigDecimal} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.{Date, Timestamp} @@ -612,7 +612,13 @@ class ParquetFilters( value == null || (nameToParquetField(name).fieldType match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetIntegerType if value.isInstanceOf[Period] => true - case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] + case ParquetByteType | ParquetShortType | ParquetIntegerType => value match { +// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int. +// We don't create a filter if the value would overflow. +case _: JByte | _: JShort | _: Integer => true +case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue +case _ => false + } case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration] case ParquetFloatType => value.isInstanceOf[JFloat] case ParquetDoubleType =>
(spark) branch branch-3.5 updated: [SPARK-46092][SQL][3.5] Don't push down Parquet row group filters that overflow
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 97472c91ed56 [SPARK-46092][SQL][3.5] Don't push down Parquet row group filters that overflow 97472c91ed56 is described below commit 97472c91ed5660c5af862e8da99d44a1c24f2815 Author: Johan Lasperas AuthorDate: Mon Dec 4 08:58:03 2023 -0800 [SPARK-46092][SQL][3.5] Don't push down Parquet row group filters that overflow This is a cherry-pick from https://github.com/apache/spark/pull/44006 to spark 3.5 ### What changes were proposed in this pull request? This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")` While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing. ### Why are the changes needed? Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` will return an empty result. The correct result is either: - Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today) - Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability). ### Does this PR introduce _any_ user-facing change? The following: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader. ### How was this patch tested? - Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type - Added test to `ParquetQuerySuite` covering the correctness issue described above. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44154 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.5. Authored-by: Johan Lasperas Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetFilters.scala | 10 ++- .../datasources/parquet/ParquetFilterSuite.scala | 71 ++ .../datasources/parquet/ParquetQuerySuite.scala| 20 ++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 5899b6621ad8..0983841dc8c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import java.math.{BigDecimal => JBigDecimal} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.{Date, Timestamp} @@ -612,7 +612,13 @@ class ParquetFilters( value == null || (nameToParquetField(name).fieldType match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetIntegerType if value.isInstanceOf[Period] => true - case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] + case ParquetByteType | ParquetShortType | ParquetIntegerType => value match { +// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int. +// We don't create a filter if the value would overflow. +case _: JByte | _: JShort | _: Integer => true +case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue +case _ => false + } case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration] case ParquetFloatType => value.isInstanceOf[JFloat] case ParquetDoubleType =>
(spark) branch master updated: [SPARK-46231][PYTHON] Migrate all remaining `NotImplementedError` & `TypeError` into PySpark error framework
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9666bf37958e [SPARK-46231][PYTHON] Migrate all remaining `NotImplementedError` & `TypeError` into PySpark error framework 9666bf37958e is described below commit 9666bf37958e5381278ca622bf7ec4b4ccb13d79 Author: Haejoon Lee AuthorDate: Mon Dec 4 08:54:50 2023 -0800 [SPARK-46231][PYTHON] Migrate all remaining `NotImplementedError` & `TypeError` into PySpark error framework ### What changes were proposed in this pull request? This PR proposes to migrate all remaining `NotImplementedError` and `TypeError` from `pyspark/sql/*` into PySpark error framework, `PySparkNotImplementedError` with assigning dedicated error classes. ### Why are the changes needed? To improve the error handling in PySpark. ### Does this PR introduce _any_ user-facing change? No API changes, but the user-facing error messages will be improved. ### How was this patch tested? The existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44148 from itholic/not_impl_and_type. Authored-by: Haejoon Lee Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/datasource.py| 21 + .../sql/tests/pandas/test_pandas_udf_grouped_agg.py | 3 +++ python/pyspark/sql/udf.py | 8 +++- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index 1c5b6d663285..4713ca5366a7 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -19,6 +19,7 @@ from typing import final, Any, Dict, Iterator, List, Sequence, Tuple, Type, Unio from pyspark.sql import Row from pyspark.sql.types import StructType +from pyspark.errors import PySparkNotImplementedError if TYPE_CHECKING: from pyspark.sql._typing import OptionalPrimitiveType @@ -103,7 +104,10 @@ class DataSource(ABC): >>> def schema(self): ... return StructType().add("a", "int").add("b", "string") """ -raise NotImplementedError +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "schema"}, +) def reader(self, schema: StructType) -> "DataSourceReader": """ @@ -121,7 +125,10 @@ class DataSource(ABC): reader : DataSourceReader A reader instance for this data source. """ -raise NotImplementedError +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "reader"}, +) def writer(self, schema: StructType, saveMode: str) -> "DataSourceWriter": """ @@ -142,7 +149,10 @@ class DataSource(ABC): writer : DataSourceWriter A writer instance for this data source. """ -raise NotImplementedError +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "writer"}, +) class InputPartition: @@ -239,7 +249,10 @@ class DataSourceReader(ABC): >>> def partitions(self): ... return [RangeInputPartition(1, 3), RangeInputPartition(5, 10)] """ -raise NotImplementedError +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) @abstractmethod def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py index b500be7a9695..455bb09a7dc4 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py @@ -720,6 +720,9 @@ class GroupedAggPandasUDFTestsMixin: class GroupedAggPandasUDFTests(GroupedAggPandasUDFTestsMixin, ReusedSQLTestCase): +def test_unsupported_types(self): +super().test_unsupported_types() + pass diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 9ffdbb218711..351bcea3f389 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -339,7 +339,13 @@ class UserDefinedFunction: try: # StructType is not yet allowed as a return type, explicitly check here to fail fast if isinstance(self._returnType_placeholder, StructType): -raise TypeError +raise PySparkNotImplementedError( +
(spark) branch master updated: [SPARK-46237][SQL][TESTS] Make `HiveDDLSuite` independently testable
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 37d19b9ee0e4 [SPARK-46237][SQL][TESTS] Make `HiveDDLSuite` independently testable 37d19b9ee0e4 is described below commit 37d19b9ee0e4e100e37358e71d771a2e42d01d88 Author: yangjie01 AuthorDate: Mon Dec 4 08:52:23 2023 -0800 [SPARK-46237][SQL][TESTS] Make `HiveDDLSuite` independently testable ### What changes were proposed in this pull request? When I test `HiveDDLSuite` with ``` build/sbt "hive/testOnly org.apache.spark.sql.hive.execution.HiveDDLSuite" -Phive ``` This test throws an error: ``` [info] - SPARK-34261: Avoid side effect if create exists temporary function *** FAILED *** (4 milliseconds) [info] java.util.NoSuchElementException: key not found: default [info] at scala.collection.MapOps.default(Map.scala:274) [info] at scala.collection.MapOps.default$(Map.scala:273) [info] at scala.collection.AbstractMap.default(Map.scala:405) [info] at scala.collection.MapOps.apply(Map.scala:176) [info] at scala.collection.MapOps.apply$(Map.scala:175) [info] at scala.collection.AbstractMap.apply(Map.scala:405) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$445(HiveDDLSuite.scala:3275) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withUserDefinedFunction(SQLTestUtils.scala:256) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withUserDefinedFunction$(SQLTestUtils.scala:254) [info] at org.apache.spark.sql.execution.command.DDLSuite.withUserDefinedFunction(DDLSuite.scala:326) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$444(HiveDDLSuite.scala:3267) ``` I manually printed the content of `spark.sparkContext.addedJars`, which is an empty `Map`. However, when I execute ``` build/sbt "hive/testOnly org.apache.spark.sql.hive.execution.SQLQuerySuite org.apache.spark.sql.hive.execution.HiveDDLSuite" -Phive ``` All tests pass, and the content of `spark.sparkContext.addedJars` is ``` Map(default -> Map(spark://localhost:54875/jars/SPARK-21101-1.0.jar -> 1701676986594, spark://localhost:54875/jars/hive-contrib-2.3.9.jar -> 1701676944590, spark://localhost:54875/jars/TestUDTF.jar -> 1701676921340)) ``` The reason why this failure is not reproduced in the GitHub Action test is because `SQLQuerySuite` is indeed executed before `HiveDDLSuite`. So in the current PR, I change to use `.get("default").foreach(_.remove(k))` that the remove operation is only performed when `.get("default")` is not `None`. ### Why are the changes needed? Make `HiveDDLSuite` independently testable. ### Does this PR introduce _any_ user-facing change? No, just for test ### How was this patch tested? - Pass Github Actions - Manual check `HiveDDLSuite` with this pr and all test passed ### Was this patch authored or co-authored using generative AI tooling? No Closes #44153 from LuciferYang/HiveDDLSuite. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c3a528da382a..2f5d1fcbb540 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -3270,7 +3270,7 @@ class HiveDDLSuite val jarName = "TestUDTF.jar" val jar = spark.asInstanceOf[TestHiveSparkSession].getHiveFile(jarName).toURI.toString spark.sparkContext.allAddedJars.keys.find(_.contains(jarName)) -.foreach(spark.sparkContext.addedJars("default").remove) +.foreach(k => spark.sparkContext.addedJars.get("default").foreach(_.remove(k))) assert(!spark.sparkContext.listJars().exists(_.contains(jarName))) val e = intercept[AnalysisException] { sql("CREATE TEMPORARY FUNCTION f1 AS " + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46234][PYTHON] Introduce `PySparkKeyError` for PySpark error framework
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2423a4517ace [SPARK-46234][PYTHON] Introduce `PySparkKeyError` for PySpark error framework 2423a4517ace is described below commit 2423a4517ace1d76938ee3fd285594900f272552 Author: Haejoon Lee AuthorDate: Mon Dec 4 18:37:36 2023 +0800 [SPARK-46234][PYTHON] Introduce `PySparkKeyError` for PySpark error framework ### What changes were proposed in this pull request? This PR proposes to introduce `PySparkKeyError` for error framework, and migrate Python built-in `KeyError` into `PySparkKeyError`. ### Why are the changes needed? For better error handling. ### Does this PR introduce _any_ user-facing change? No API changes, but it's improve the user-facing error messages. ### How was this patch tested? The existing CI should pass ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44151 from itholic/pyspark_keyerror. Authored-by: Haejoon Lee Signed-off-by: Ruifeng Zheng --- python/docs/source/reference/pyspark.errors.rst | 1 + python/pyspark/errors/__init__.py | 2 ++ python/pyspark/errors/error_classes.py | 5 + python/pyspark/errors/exceptions/base.py| 6 ++ python/pyspark/sql/types.py | 9 +++-- 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/python/docs/source/reference/pyspark.errors.rst b/python/docs/source/reference/pyspark.errors.rst index 88cbd405b83d..a4997506b41e 100644 --- a/python/docs/source/reference/pyspark.errors.rst +++ b/python/docs/source/reference/pyspark.errors.rst @@ -38,6 +38,7 @@ Classes PySparkAssertionError PySparkAttributeError PySparkException +PySparkKeyError PySparkNotImplementedError PySparkPicklingError PySparkRuntimeError diff --git a/python/pyspark/errors/__init__.py b/python/pyspark/errors/__init__.py index 923cb665d112..07033d216432 100644 --- a/python/pyspark/errors/__init__.py +++ b/python/pyspark/errors/__init__.py @@ -46,6 +46,7 @@ from pyspark.errors.exceptions.base import ( # noqa: F401 PySparkAssertionError, PySparkNotImplementedError, PySparkPicklingError, +PySparkKeyError, ) @@ -77,4 +78,5 @@ __all__ = [ "PySparkAssertionError", "PySparkNotImplementedError", "PySparkPicklingError", +"PySparkKeyError", ] diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index d0c0d1c115b0..8ca73ca85de6 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -387,6 +387,11 @@ ERROR_CLASSES_JSON = """ "Attribute `` is not supported in Spark Connect as it depends on the JVM. If you need to use this attribute, do not use Spark Connect when creating your session. Visit https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession for creating regular Spark Session in detail." ] }, + "KEY_NOT_EXISTS" : { +"message" : [ + "Key `` is not exists." +] + }, "KEY_VALUE_PAIR_REQUIRED" : { "message" : [ "Key-value pair or a list of pairs is required." diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index 4a2b31418e29..0f4001483b7f 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -264,3 +264,9 @@ class PySparkImportError(PySparkException, ImportError): """ Wrapper class for ImportError to support error classes. """ + + +class PySparkKeyError(PySparkException, KeyError): +""" +Wrapper class for KeyError to support error classes. +""" diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 932a5a703ea9..cbfc4ab5df02 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -55,6 +55,7 @@ from pyspark.errors import ( PySparkTypeError, PySparkValueError, PySparkIndexError, +PySparkKeyError, ) if has_numpy: @@ -1042,7 +1043,9 @@ class StructType(DataType): for field in self: if field.name == key: return field -raise KeyError("No StructField named {0}".format(key)) +raise PySparkKeyError( +error_class="KEY_NOT_EXISTS", message_parameters={"key": str(key)} +) elif isinstance(key, int): try: return self.fields[key] @@ -2563,7 +2566,9 @@ class Row(tuple): idx = self.__fields__.index(item) return super(Row, self).__getitem__(idx) except IndexError: -raise KeyError(item) +raise PySparkKeyError( +
(spark) branch master updated: [SPARK-46186][CONNECT] Fix illegal state transition when ExecuteThreadRunner interrupted before started
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be9a710efb7b [SPARK-46186][CONNECT] Fix illegal state transition when ExecuteThreadRunner interrupted before started be9a710efb7b is described below commit be9a710efb7b1b9565c52766728d793fb497b483 Author: Juliusz Sompolski AuthorDate: Mon Dec 4 19:13:18 2023 +0900 [SPARK-46186][CONNECT] Fix illegal state transition when ExecuteThreadRunner interrupted before started ### What changes were proposed in this pull request? A race condition sending interrupt (or releaseSession) just after execute could cause: ``` [info] org.apache.spark.SparkException: java.lang.IllegalStateException: [info] operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Started [info] is not within statuses List(Finished, Failed, Canceled) for event Closed ``` and ``` [info] org.apache.spark.SparkException: java.lang.IllegalStateException: [info] operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Closed [info] is not within statuses List(Started, Analyzed, ReadyForExecution, Finished, Failed) for event Canceled ``` when the interrupt arrives before the thread in ExecuteThreadRunner is started. This would cause in ExecuteHolder close: ``` runner.interrupt() <- just sets interrupted = true runner.join() <- thread didn't start yet, so join() returns immediately, doesn't wait for thread to be actually interrupted ... eventsManager.postClosed() <- causes the first failure, because thread wasn't running and didn't move to Canceled ``` Afterwards, assuming we allowed the transition, the thread will get started, and then want to immediately move to Canceled, notice the `interrupted` state. Then it would hit the 2nd error, not allowing Canceled after Closed. While we could consider allowing the first transition (Started -> Closed), we don't want any events to be coming after Closed, so that listeners can clean their state after Closed. Fix is to handle interrupts coming before the thread started, and then prevent the thread from even starting if it was interruped. ### Why are the changes needed? This was detected after grpc 1.56 to 1.59 upgrade and causes some tests in SparkConnectServiceE2ESuite and ReattachableExecuteSuite to be flaky. With the grpc upgrade, execute is eagerly sent to the server, and in some test we cleanup and release the session without waiting for the execution to start. This has triggered this flakiness. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test added. The test depends on timing, so may not fail reliably but only from time to time. ### Was this patch authored or co-authored using generative AI tooling? Github Copilot was assisting in some boilerplate auto-completion. Generated-by: Github Copilot Closes #44095 from juliuszsompolski/SPARK-46186. Authored-by: Juliusz Sompolski Signed-off-by: Hyukjin Kwon --- .../connect/execution/ExecuteThreadRunner.scala| 33 ++ .../spark/sql/connect/service/ExecuteHolder.scala | 7 - .../execution/ReattachableExecuteSuite.scala | 26 + 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 24b3c302b759..0ecdc4bdef96 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -42,6 +42,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends // forwarding of thread locals needs to be taken into account. private val executionThread: Thread = new ExecutionThread() + private var started: Boolean = false + private var interrupted: Boolean = false private var completed: Boolean = false @@ -49,12 +51,21 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends private val lock = new Object /** Launches the execution in a background thread, returns immediately. */ - def start(): Unit = { -executionThread.start() + private[connect] def start(): Unit = { +lock.synchronized { + assert(!started) + // Do not start if already interrupted. + if (!interrupted) { +executionThread.start() +started = true +