This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bdb617278c0b [SPARK-46565][PYTHON] Refine error classes and error messages for Python data sources bdb617278c0b is described below commit bdb617278c0bc83c04988055c3411d6c68926af7 Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Wed Jan 3 09:21:53 2024 +0900 [SPARK-46565][PYTHON] Refine error classes and error messages for Python data sources ### What changes were proposed in this pull request? This PR improves error classes and messages associated with Python data sources. It removes unnecessary error handling in Python and makes error class names more user-friendly. ### Why are the changes needed? To make the error messages clearer and more user-friendly. For instance, current stack traces has redundant information ``` AnalysisException: [PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON] Failed to create Python data source instance in Python: Traceback (most recent call last): ... pyspark.errors.exceptions.base.PySparkNotImplementedError: [NOT_IMPLEMENTED] schema is not implemented. During handling of the above exception, another exception occurred: Traceback (most recent call last): raise PySparkRuntimeError( pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_DATA_SOURCE_METHOD_NOT_IMPLEMENTED] Unable to create the Python data source instance because the 'schema' method hasn't been implemented. SQLSTATE: 38000 ``` After this PR, this `During handling of the above exception, another exception occurred:` will not show up. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44560 from allisonwang-db/spark-46565-pyds-error-msgs. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../src/main/resources/error/error-classes.json | 4 +- docs/sql-error-conditions.md | 4 +- python/pyspark/errors/error_classes.py | 45 +++++++-------------- python/pyspark/sql/tests/test_python_datasource.py | 12 ++---- python/pyspark/sql/worker/plan_data_source_read.py | 46 ++++++---------------- .../pyspark/sql/worker/write_into_data_source.py | 34 ++++++---------- .../spark/sql/errors/QueryCompilationErrors.scala | 4 +- .../python/UserDefinedPythonDataSource.scala | 10 ++--- .../execution/python/PythonDataSourceSuite.scala | 18 ++++----- 9 files changed, 63 insertions(+), 114 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index c8d8ec6e216a..87e43fe0e38c 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3009,9 +3009,9 @@ ], "sqlState" : "42K0G" }, - "PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON" : { + "PYTHON_DATA_SOURCE_ERROR" : { "message" : [ - "Failed to <action> Python data source <type> in Python: <msg>" + "Failed to <action> Python data source <type>: <msg>" ], "sqlState" : "38000" }, diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 248839666ef2..3f4074af9b78 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -1808,11 +1808,11 @@ Unable to locate Message `<messageName>` in Descriptor. Protobuf type not yet supported: `<protobufType>`. -### PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON +### PYTHON_DATA_SOURCE_ERROR [SQLSTATE: 38000](sql-error-conditions-sqlstates.html#class-38-external-routine-exception) -Failed to `<action>` Python data source `<type>` in Python: `<msg>` +Failed to `<action>` Python data source `<type>`: `<msg>` ### RECURSIVE_PROTOBUF_SCHEMA diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index 2200b73dffc1..7d627a508d12 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -194,6 +194,21 @@ ERROR_CLASSES_JSON = """ "Remote client cannot create a SparkContext. Create SparkSession instead." ] }, + "DATA_SOURCE_INVALID_RETURN_TYPE" : { + "message" : [ + "Unsupported return type ('<type>') from Python data source '<name>'. Expected types: <supported_types>." + ] + }, + "DATA_SOURCE_RETURN_SCHEMA_MISMATCH" : { + "message" : [ + "Return schema mismatch in the result from 'read' method. Expected: <expected> columns, Found: <actual> columns. Make sure the returned values match the required output schema." + ] + }, + "DATA_SOURCE_TYPE_MISMATCH" : { + "message" : [ + "Expected <expected>, but got <actual>." + ] + }, "DIFFERENT_PANDAS_DATAFRAME" : { "message" : [ "DataFrames are not almost equal:", @@ -747,36 +762,6 @@ ERROR_CLASSES_JSON = """ "Pipe function `<func_name>` exited with error code <error_code>." ] }, - "PYTHON_DATA_SOURCE_CREATE_ERROR" : { - "message" : [ - "Unable to create the Python data source <type>: <error>." - ] - }, - "PYTHON_DATA_SOURCE_METHOD_NOT_IMPLEMENTED" : { - "message" : [ - "Unable to create the Python data source <type> because the '<method>' method hasn't been implemented." - ] - }, - "PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE" : { - "message" : [ - "The data type of the returned value ('<type>') from the Python data source '<name>' is not supported. Supported types: <supported_types>." - ] - }, - "PYTHON_DATA_SOURCE_READ_RETURN_SCHEMA_MISMATCH" : { - "message" : [ - "The number of columns in the result does not match the required schema. Expected column count: <expected>, Actual column count: <actual>. Please make sure the values returned by the 'read' method have the same number of columns as required by the output schema." - ] - }, - "PYTHON_DATA_SOURCE_TYPE_MISMATCH" : { - "message" : [ - "Expected <expected>, but got <actual>." - ] - }, - "PYTHON_DATA_SOURCE_WRITE_ERROR" : { - "message" : [ - "Unable to write to the Python data source: <error>." - ] - }, "PYTHON_HASH_SEED_NOT_SET" : { "message" : [ "Randomness of hash of string should be disabled via PYTHONHASHSEED." diff --git a/python/pyspark/sql/tests/test_python_datasource.py b/python/pyspark/sql/tests/test_python_datasource.py index 8517d8f36382..ce629b2718e2 100644 --- a/python/pyspark/sql/tests/test_python_datasource.py +++ b/python/pyspark/sql/tests/test_python_datasource.py @@ -154,7 +154,7 @@ class BasePythonDataSourceTestsMixin: def test_data_source_read_output_none(self): self.register_data_source(read_func=lambda schema, partition: None) df = self.spark.read.format("test").load() - with self.assertRaisesRegex(PythonException, "PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE"): + with self.assertRaisesRegex(PythonException, "DATA_SOURCE_INVALID_RETURN_TYPE"): assertDataFrameEqual(df, []) def test_data_source_read_output_empty_iter(self): @@ -186,22 +186,18 @@ class BasePythonDataSourceTestsMixin: def test_data_source_read_output_with_schema_mismatch(self): self.register_data_source(read_func=lambda schema, partition: iter([(0, 1)])) df = self.spark.read.format("test").schema("i int").load() - with self.assertRaisesRegex( - PythonException, "PYTHON_DATA_SOURCE_READ_RETURN_SCHEMA_MISMATCH" - ): + with self.assertRaisesRegex(PythonException, "DATA_SOURCE_RETURN_SCHEMA_MISMATCH"): df.collect() self.register_data_source( read_func=lambda schema, partition: iter([(0, 1)]), output="i int, j int, k int" ) - with self.assertRaisesRegex( - PythonException, "PYTHON_DATA_SOURCE_READ_RETURN_SCHEMA_MISMATCH" - ): + with self.assertRaisesRegex(PythonException, "DATA_SOURCE_RETURN_SCHEMA_MISMATCH"): df.collect() def test_read_with_invalid_return_row_type(self): self.register_data_source(read_func=lambda schema, partition: iter([1])) df = self.spark.read.format("test").load() - with self.assertRaisesRegex(PythonException, "PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE"): + with self.assertRaisesRegex(PythonException, "DATA_SOURCE_INVALID_RETURN_TYPE"): df.collect() def test_in_memory_data_source(self): diff --git a/python/pyspark/sql/worker/plan_data_source_read.py b/python/pyspark/sql/worker/plan_data_source_read.py index d4693f5ff7be..8f1fc1e59a61 100644 --- a/python/pyspark/sql/worker/plan_data_source_read.py +++ b/python/pyspark/sql/worker/plan_data_source_read.py @@ -85,7 +85,7 @@ def main(infile: IO, outfile: IO) -> None: data_source = read_command(pickleSer, infile) if not isinstance(data_source, DataSource): raise PySparkAssertionError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": "a Python data source instance of type 'DataSource'", "actual": f"'{type(data_source).__name__}'", @@ -97,7 +97,7 @@ def main(infile: IO, outfile: IO) -> None: input_schema = _parse_datatype_json_string(input_schema_json) if not isinstance(input_schema, StructType): raise PySparkAssertionError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": "an input schema of type 'StructType'", "actual": f"'{type(input_schema).__name__}'", @@ -128,54 +128,32 @@ def main(infile: IO, outfile: IO) -> None: ) # Instantiate data source reader. - try: - reader = data_source.reader(schema=schema) - except NotImplementedError: - raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_METHOD_NOT_IMPLEMENTED", - message_parameters={"type": "reader", "method": "reader"}, - ) - except Exception as e: - raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_CREATE_ERROR", - message_parameters={"type": "reader", "error": str(e)}, - ) + reader = data_source.reader(schema=schema) # Get the partitions if any. try: partitions = reader.partitions() if not isinstance(partitions, list): raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_CREATE_ERROR", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ - "type": "reader", - "error": ( - "Expect 'partitions' to return a list, but got " - f"'{type(partitions).__name__}'" - ), + "expected": "'partitions' to return a list", + "actual": f"'{type(partitions).__name__}'", }, ) if not all(isinstance(p, InputPartition) for p in partitions): partition_types = ", ".join([f"'{type(p).__name__}'" for p in partitions]) raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_CREATE_ERROR", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ - "type": "reader", - "error": ( - "All elements in 'partitions' should be of type " - f"'InputPartition', but got {partition_types}" - ), + "expected": "all elements in 'partitions' to be of type 'InputPartition'", + "actual": partition_types, }, ) if len(partitions) == 0: partitions = [None] # type: ignore except NotImplementedError: partitions = [None] # type: ignore - except Exception as e: - raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_CREATE_ERROR", - message_parameters={"type": "reader", "error": str(e)}, - ) # Wrap the data source read logic in an mapInArrow UDF. import pyarrow as pa @@ -222,7 +200,7 @@ def main(infile: IO, outfile: IO) -> None: # Validate the output iterator. if not isinstance(output_iter, Iterator): raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE", + error_class="DATA_SOURCE_INVALID_RETURN_TYPE", message_parameters={ "type": type(output_iter).__name__, "name": data_source.name(), @@ -243,7 +221,7 @@ def main(infile: IO, outfile: IO) -> None: # Validate the output row schema. if hasattr(result, "__len__") and len(result) != num_cols: raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_READ_RETURN_SCHEMA_MISMATCH", + error_class="DATA_SOURCE_RETURN_SCHEMA_MISMATCH", message_parameters={ "expected": str(num_cols), "actual": str(len(result)), @@ -253,7 +231,7 @@ def main(infile: IO, outfile: IO) -> None: # Validate the output row type. if not isinstance(result, (list, tuple)): raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE", + error_class="DATA_SOURCE_INVALID_RETURN_TYPE", message_parameters={ "type": type(result).__name__, "name": data_source.name(), diff --git a/python/pyspark/sql/worker/write_into_data_source.py b/python/pyspark/sql/worker/write_into_data_source.py index 7db2744e16f8..36b3c23b3379 100644 --- a/python/pyspark/sql/worker/write_into_data_source.py +++ b/python/pyspark/sql/worker/write_into_data_source.py @@ -84,7 +84,7 @@ def main(infile: IO, outfile: IO) -> None: data_source_cls = read_command(pickleSer, infile) if not (isinstance(data_source_cls, type) and issubclass(data_source_cls, DataSource)): raise PySparkAssertionError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": "a subclass of DataSource", "actual": f"'{type(data_source_cls).__name__}'", @@ -94,7 +94,7 @@ def main(infile: IO, outfile: IO) -> None: # Check the name method is a class method. if not inspect.ismethod(data_source_cls.name): raise PySparkTypeError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": "'name()' method to be a classmethod", "actual": f"'{type(data_source_cls.name).__name__}'", @@ -107,7 +107,7 @@ def main(infile: IO, outfile: IO) -> None: # Check if the provider name matches the data source's name. if provider.lower() != data_source_cls.name().lower(): raise PySparkAssertionError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": f"provider with name {data_source_cls.name()}", "actual": f"'{provider}'", @@ -118,7 +118,7 @@ def main(infile: IO, outfile: IO) -> None: schema = _parse_datatype_json_string(utf8_deserializer.loads(infile)) if not isinstance(schema, StructType): raise PySparkAssertionError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": "the schema to be a 'StructType'", "actual": f"'{type(data_source_cls).__name__}'", @@ -129,7 +129,7 @@ def main(infile: IO, outfile: IO) -> None: return_type = _parse_datatype_json_string(utf8_deserializer.loads(infile)) if not isinstance(return_type, StructType): raise PySparkAssertionError( - error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ "expected": "a return type of type 'StructType'", "actual": f"'{type(return_type).__name__}'", @@ -153,22 +153,10 @@ def main(infile: IO, outfile: IO) -> None: overwrite = read_bool(infile) # Instantiate a data source. - try: - data_source = data_source_cls(options=options) - except Exception as e: - raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_CREATE_ERROR", - message_parameters={"type": "instance", "error": str(e)}, - ) + data_source = data_source_cls(options=options) # Instantiate the data source writer. - try: - writer = data_source.writer(schema, overwrite) - except Exception as e: - raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_CREATE_ERROR", - message_parameters={"type": "writer", "error": str(e)}, - ) + writer = data_source.writer(schema, overwrite) # Create a function that can be used in mapInArrow. import pyarrow as pa @@ -193,10 +181,12 @@ def main(infile: IO, outfile: IO) -> None: # Check the commit message has the right type. if not isinstance(res, WriterCommitMessage): raise PySparkRuntimeError( - error_class="PYTHON_DATA_SOURCE_WRITE_ERROR", + error_class="DATA_SOURCE_TYPE_MISMATCH", message_parameters={ - "error": f"return type of the `write` method must be " - f"an instance of WriterCommitMessage, but got {type(res)}" + "expected": ( + "'WriterCommitMessage' as the return type of " "the `write` method" + ), + "actual": type(res).__name__, }, ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e8235fd10466..bc847d1c0069 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2040,9 +2040,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } - def failToPlanDataSourceError(action: String, tpe: String, msg: String): Throwable = { + def pythonDataSourceError(action: String, tpe: String, msg: String): Throwable = { new AnalysisException( - errorClass = "PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON", + errorClass = "PYTHON_DATA_SOURCE_ERROR", messageParameters = Map("action" -> action, "type" -> tpe, "msg" -> msg) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala index 2812e31e7a8c..4b567c591672 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala @@ -459,7 +459,7 @@ class UserDefinedPythonDataSourceLookupRunner(lookupSources: PythonFunction) val length = dataIn.readInt() if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) { val msg = PythonWorkerUtils.readUTF(dataIn) - throw QueryCompilationErrors.failToPlanDataSourceError( + throw QueryCompilationErrors.pythonDataSourceError( action = "lookup", tpe = "instance", msg = msg) } @@ -524,7 +524,7 @@ class UserDefinedPythonDataSourceRunner( val length = dataIn.readInt() if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) { val msg = PythonWorkerUtils.readUTF(dataIn) - throw QueryCompilationErrors.failToPlanDataSourceError( + throw QueryCompilationErrors.pythonDataSourceError( action = "create", tpe = "instance", msg = msg) } @@ -587,7 +587,7 @@ class UserDefinedPythonDataSourceReadRunner( val length = dataIn.readInt() if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) { val msg = PythonWorkerUtils.readUTF(dataIn) - throw QueryCompilationErrors.failToPlanDataSourceError( + throw QueryCompilationErrors.pythonDataSourceError( action = "plan", tpe = "read", msg = msg) } @@ -657,7 +657,7 @@ class UserDefinedPythonDataSourceWriteRunner( val length = dataIn.readInt() if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) { val msg = PythonWorkerUtils.readUTF(dataIn) - throw QueryCompilationErrors.failToPlanDataSourceError( + throw QueryCompilationErrors.pythonDataSourceError( action = "plan", tpe = "write", msg = msg) } @@ -707,7 +707,7 @@ class UserDefinedPythonDataSourceCommitRunner( val code = dataIn.readInt() if (code == SpecialLengths.PYTHON_EXCEPTION_THROWN) { val msg = PythonWorkerUtils.readUTF(dataIn) - throw QueryCompilationErrors.failToPlanDataSourceError( + throw QueryCompilationErrors.pythonDataSourceError( action = "commit or abort", tpe = "write", msg = msg) } assert(code == 0, s"Python commit job should run successfully, but got exit code: $code") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala index 49fb2e859fff..3e7cd82db8d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala @@ -299,8 +299,8 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { val err = intercept[AnalysisException] { spark.read.format(dataSourceName).schema(schema).load().collect() } - assert(err.getErrorClass == "PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON") - assert(err.getMessage.contains("PYTHON_DATA_SOURCE_METHOD_NOT_IMPLEMENTED")) + assert(err.getErrorClass == "PYTHON_DATA_SOURCE_ERROR") + assert(err.getMessage.contains("PySparkNotImplementedError")) } test("error creating reader") { @@ -319,8 +319,7 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { val err = intercept[AnalysisException] { spark.read.format(dataSourceName).schema(schema).load().collect() } - assert(err.getErrorClass == "PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON") - assert(err.getMessage.contains("PYTHON_DATA_SOURCE_CREATE_ERROR")) + assert(err.getErrorClass == "PYTHON_DATA_SOURCE_ERROR") assert(err.getMessage.contains("error creating reader")) } @@ -339,8 +338,8 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { val err = intercept[AnalysisException] { spark.read.format(dataSourceName).schema(schema).load().collect() } - assert(err.getErrorClass == "PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON") - assert(err.getMessage.contains("PYTHON_DATA_SOURCE_TYPE_MISMATCH")) + assert(err.getErrorClass == "PYTHON_DATA_SOURCE_ERROR") + assert(err.getMessage.contains("DATA_SOURCE_TYPE_MISMATCH")) assert(err.getMessage.contains("PySparkAssertionError")) } @@ -450,8 +449,8 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { spark.dataSource.registerPython(dataSourceName, dataSource) val err = intercept[AnalysisException]( spark.read.format(dataSourceName).load().collect()) - assert(err.getErrorClass == "PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON") - assert(err.getMessage.contains("PYTHON_DATA_SOURCE_CREATE_ERROR")) + assert(err.getErrorClass == "PYTHON_DATA_SOURCE_ERROR") + assert(err.getMessage.contains("partitions")) } } @@ -618,7 +617,8 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { val error = intercept[SparkException] { spark.range(1).write.format(dataSourceName).mode("append").save() } - assert(error.getMessage.contains("PYTHON_DATA_SOURCE_WRITE_ERROR")) + assert(error.getMessage.contains("DATA_SOURCE_TYPE_MISMATCH")) + assert(error.getMessage.contains("WriterCommitMessage")) } withClue("without mode") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org