(spark) branch master updated: [SPARK-46950][CORE][SQL] Align `not available codec` error-class
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 88d4681dd6ab [SPARK-46950][CORE][SQL] Align `not available codec` error-class 88d4681dd6ab is described below commit 88d4681dd6ab37d65d704b0119ba2a54801fffde Author: panbingkun AuthorDate: Sat Feb 3 10:21:58 2024 +0300 [SPARK-46950][CORE][SQL] Align `not available codec` error-class ### What changes were proposed in this pull request? The pr aims to align `not available codec` error-class, includes: - In `core`, convert `CODEC_NOT_AVAILABLE` to `CODEC_NOT_AVAILABLE.WITH_CONF_SUGGESTION`. - In `sql`, use `CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION` to cover `IllegalArgumentException`. ### Why are the changes needed? When I was solving a case where `the compression option` is `null`, as follows: https://github.com/apache/spark/assets/15246973/0e2abdad-0c1c-4ade-9f48-014bc4372fc6";> I found that we could first complete the extraction logic for `the codec error`. At the same time, I found that there was already an `CODEC_NOT_AVAILABLE` error in the `error-classes.json`, but it had some differences from the error prompt in `SQL` for compress. Therefore, I proposed using `CODEC_NOT_AVAILABLE` as `a parent class error` and adding `two subclass errors`: `WITH_AVAILABLE_CODECS_SUGGESTION` and `WITH_CONF_SUGGESTION`. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? - Update existed UT. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44992 from panbingkun/codec_improve. Lead-authored-by: panbingkun Co-authored-by: Maxim Gekk Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 14 +++- .../org/apache/spark/errors/SparkCoreErrors.scala | 13 ++- .../org/apache/spark/io/CompressionCodec.scala | 9 ++--- .../apache/spark/io/CompressionCodecSuite.scala| 2 +- ...r-conditions-codec-not-available-error-class.md | 41 ++ docs/sql-error-conditions.md | 6 ++-- .../sql/catalyst/util/CompressionCodecs.scala | 7 ++-- .../spark/sql/errors/QueryExecutionErrors.scala| 8 + .../sql/execution/datasources/orc/OrcOptions.scala | 4 +-- .../datasources/parquet/ParquetOptions.scala | 5 +-- .../ParquetCompressionCodecPrecedenceSuite.scala | 14 +--- .../sql/execution/datasources/text/TextSuite.scala | 18 ++ 12 files changed, 112 insertions(+), 29 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 6d88f5ee511c..8399311cbfc4 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -441,8 +441,20 @@ }, "CODEC_NOT_AVAILABLE" : { "message" : [ - "The codec is not available. Consider to set the config to ." + "The codec is not available." ], +"subClass" : { + "WITH_AVAILABLE_CODECS_SUGGESTION" : { +"message" : [ + "Available codecs are ." +] + }, + "WITH_CONF_SUGGESTION" : { +"message" : [ + "Consider to set the config to ." +] + } +}, "sqlState" : "56038" }, "CODEC_SHORT_NAME_NOT_FOUND" : { diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala index 641310f2a0c5..a131f8233b0d 100644 --- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -24,7 +24,9 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskNotSerializableException} +import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkRuntimeException, SparkUnsupportedOperationException, TaskNotSerializableException} +import org.apache.spark.internal.config.IO_COMPRESSION_CODEC +import org.apache.spark.io.CompressionCodec.FALLBACK_COMPRESSION_CODEC import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException} import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager} @@ -490,6 +492,15 @@ private[spark] object SparkCoreErrors { cause = null) } + def codecNotAvailableError(codecName: String): Throwable = { +new SparkIllegalArgumentException( + errorClass = "CODEC_NOT_AVAILABLE.WITH_CONF_SUGG
(spark) branch master updated: [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 16ac82092bb7 [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning 16ac82092bb7 is described below commit 16ac82092bb775aafd010e2fb02b7ddc1eceea73 Author: Daniel Tenedorio AuthorDate: Sat Feb 3 08:50:44 2024 +0300 [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning ### What changes were proposed in this pull request? This PR fixes a CSV parsing bug with existence default values and column pruning (https://issues.apache.org/jira/browse/SPARK-46890). The bug fix includes disabling column pruning specifically when checking the CSV header schema against the required schema expected by Catalyst. This makes the expected schema match what the CSV parser provides, since later we also happen instruct the CSV parser to disable column pruning and instead read each entire row in order to correctly assign the default value(s) during execution. ### Why are the changes needed? Before this change, queries from a subset of the columns in a CSV table whose `CREATE TABLE` statement contained default values would return an internal exception. For example: ``` CREATE TABLE IF NOT EXISTS products ( product_id INT, name STRING, price FLOAT default 0.0, quantity INT default 0 ) USING CSV OPTIONS ( header 'true', inferSchema 'false', enforceSchema 'false', path '/Users/maximgekk/tmp/products.csv' ); ``` The CSV file products.csv: ``` product_id,name,price,quantity 1,Apple,0.50,100 2,Banana,0.25,200 3,Orange,0.75,50 ``` The query fails: ``` spark-sql (default)> SELECT price FROM products; 24/01/28 11:43:09 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 1 CSV file: file:///Users/Daniel.Tenedorio/tmp/products.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR adds test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44939 from dtenedor/debug-csv-default. Authored-by: Daniel Tenedorio Signed-off-by: Max Gekk --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 15 - .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 +-- .../execution/datasources/csv/CSVFileFormat.scala | 5 ++- .../v2/csv/CSVPartitionReaderFactory.scala | 6 +++- .../sql/execution/datasources/csv/CSVSuite.scala | 38 ++ 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index c5a6bf5076de..f4ade722791c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -26,8 +26,10 @@ import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, Unescape import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.types.StructType class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], @@ -278,13 +280,24 @@ class CSVOptions( .getOrElse(UNESCAPED_QUOTE_HANDLING, "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT)) /** + * Returns true if column pruning is enabled and there are no existence column default values in + * the [[schema]]. + * * The column pruning feature can be enabled either via the CSV option `columnPruning` or * in non-multiline mode via initialization of CSV options by the SQL config: * `spark.sql.csv.parser.columnPruning.enabled`. * The feature is disabled in the `multiLine` mode because of the issue: * https://github.com/uniVocity/univocity-parsers/issues/529 + * + * We disable column pruning when there are any column defaults, instead preferring to reach in + * each row and then post-process it to substitute the default values after. */ - val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine &&
(spark) branch master updated: [SPARK-46965][CORE] Check `logType` in `Utils.getLog`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 84387394c387 [SPARK-46965][CORE] Check `logType` in `Utils.getLog` 84387394c387 is described below commit 84387394c387c7a6c171714f5d45d517b6bec7af Author: Dongjoon Hyun AuthorDate: Fri Feb 2 17:22:32 2024 -0800 [SPARK-46965][CORE] Check `logType` in `Utils.getLog` ### What changes were proposed in this pull request? This PR aims to check `logType` in `Utils.getLog`. ### Why are the changes needed? To prevent security vulnerability. ### Does this PR introduce _any_ user-facing change? No. This is a new module which is not released yet. ### How was this patch tested? Manually. **BEFORE** ``` $ sbin/start-master.sh $ curl -s 'http://localhost:8080/logPage/self?logType=../../../../../../etc/nfs.conf' | grep NFS # nfs.conf: the NFS configuration file ``` **AFTER** ``` $ sbin/start-master.sh $ curl -s 'http://localhost:8080/logPage/self?logType=../../../../../../etc/nfs.conf' | grep NFS ``` For `Spark History Server`, the same check with 18080 port. ``` $ curl -s 'http://localhost:18080/logPage/self?logType=../../../../../../../etc/nfs.conf' | grep NFS ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #45006 from dongjoon-hyun/SPARK-46965. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/deploy/Utils.scala | 4 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/Utils.scala b/core/src/main/scala/org/apache/spark/deploy/Utils.scala index 9bbcc9f314b2..32328ae1e07a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Utils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Utils.scala @@ -32,6 +32,7 @@ import org.apache.spark.util.logging.RollingFileAppender */ private[deploy] object Utils extends Logging { val DEFAULT_BYTES = 100 * 1024 + val SUPPORTED_LOG_TYPES = Set("stderr", "stdout", "out") def addRenderLogHandler(page: WebUI, conf: SparkConf): Unit = { page.attachHandler(createServletHandler("/log", @@ -58,6 +59,9 @@ private[deploy] object Utils extends Logging { logType: String, offsetOption: Option[Long], byteLength: Int): (String, Long, Long, Long) = { +if (!SUPPORTED_LOG_TYPES.contains(logType)) { + return ("Error: Log type must be one of " + SUPPORTED_LOG_TYPES.mkString(", "), 0, 0, 0) +} try { // Find a log file name val fileName = if (logType.equals("out")) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46964][SQL] Change the signature of the hllInvalidLgK query execution error to take an integer as 4th argument
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0965412d5174 [SPARK-46964][SQL] Change the signature of the hllInvalidLgK query execution error to take an integer as 4th argument 0965412d5174 is described below commit 0965412d517441a15d4da0b5fc8fe34a9b5ec40f Author: Menelaos Karavelas AuthorDate: Fri Feb 2 11:55:21 2024 -0800 [SPARK-46964][SQL] Change the signature of the hllInvalidLgK query execution error to take an integer as 4th argument ### What changes were proposed in this pull request? The current signature of the `hllInvalidLgK` query execution error takes four arguments: 1. The SQL function (a string). 2. The minimum possible `lgk` value (an integer). 3. The maximum possible `lgk` value (an integer). 4. The actual invalid `lgk` value (a string). There is no meaningful reason for the 4th argument to be a string. In this PR we change it to be an integer, just like the minimum and maximum valid values. ### Why are the changes needed? Seeking to make the signature of the `hllInvalidLgK` error more meaningful and self-consistent. ### Does this PR introduce _any_ user-facing change? No, there is no user-facing changes because of this PR. This is just an internal change. ### How was this patch tested? Existing tests suffice. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44995 from mkaravel/hll-invalid-lgk-error-arg. Authored-by: Menelaos Karavelas Signed-off-by: Gengliang Wang --- .../sql/catalyst/expressions/aggregate/datasketchesAggregates.scala | 2 +- .../main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala index 595ae32d77b9..02925f3625d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala @@ -196,7 +196,7 @@ object HllSketchAgg { def checkLgK(lgConfigK: Int): Unit = { if (lgConfigK < minLgConfigK || lgConfigK > maxLgConfigK) { throw QueryExecutionErrors.hllInvalidLgK(function = "hll_sketch_agg", -min = minLgConfigK, max = maxLgConfigK, value = lgConfigK.toString) +min = minLgConfigK, max = maxLgConfigK, value = lgConfigK) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 9ff076c5fd50..af5cafdc8a3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2601,14 +2601,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE cause = e) } - def hllInvalidLgK(function: String, min: Int, max: Int, value: String): Throwable = { + def hllInvalidLgK(function: String, min: Int, max: Int, value: Int): Throwable = { new SparkRuntimeException( errorClass = "HLL_INVALID_LG_K", messageParameters = Map( "function" -> toSQLId(function), "min" -> toSQLValue(min, IntegerType), "max" -> toSQLValue(max, IntegerType), -"value" -> value)) +"value" -> toSQLValue(value, IntegerType))) } def hllInvalidInputSketchBuffer(function: String): Throwable = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e35e29a0517d [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class e35e29a0517d is described below commit e35e29a0517db930e12fe801f0f0ab1a31c3b23e Author: panbingkun AuthorDate: Fri Feb 2 20:33:31 2024 +0300 [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class ### What changes were proposed in this pull request? The pr aims to: - simplify `UnaryMinus` & `Abs` - convert error-class `_LEGACY_ERROR_TEMP_2043` to `ARITHMETIC_OVERFLOW`, and remove it. ### Why are the changes needed? 1.When the data type in `UnaryMinus` and `Abs` is `ByteType` or `ShortType`, if `an overflow exception` occurs, the corresponding error class is: `_LEGACY_ERROR_TEMP_2043` But when the data type is `IntegerType` or `LongType`, if `an overflow exception` occurs, its corresponding error class is: ARITHMETIC_OVERFLOW, We should unify it. 2.In the `codegen` logic of `UnaryMinus` and `Abs`, there is a difference between the logic of generating code when the data type is `ByteType` or `ShortType` and when the data type is `IntegerType` or `LongType`. We can unify it and simplify the code. ### Does this PR introduce _any_ user-facing change? Yes, ### How was this patch tested? - Update existed UT. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44942 from panbingkun/UnaryMinus_improve. Authored-by: panbingkun Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 5 --- .../sql/catalyst/expressions/arithmetic.scala | 45 -- .../spark/sql/errors/QueryExecutionErrors.scala| 8 .../org/apache/spark/sql/types/numerics.scala | 6 +-- .../expressions/ArithmeticExpressionSuite.scala| 27 - 5 files changed, 36 insertions(+), 55 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 136825ab374d..6d88f5ee511c 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -5747,11 +5747,6 @@ ". If necessary set to false to bypass this error." ] }, - "_LEGACY_ERROR_TEMP_2043" : { -"message" : [ - "- caused overflow." -] - }, "_LEGACY_ERROR_TEMP_2045" : { "message" : [ "Unsupported table change: " diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 9f1b42ad84d3..0f95ae821ab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -60,23 +60,15 @@ case class UnaryMinus( override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = dataType match { case _: DecimalType => defineCodeGen(ctx, ev, c => s"$c.unary_$$minus()") -case ByteType | ShortType if failOnError => +case ByteType | ShortType | IntegerType | LongType if failOnError => + val typeUtils = TypeUtils.getClass.getCanonicalName.stripSuffix("$") + val refDataType = ctx.addReferenceObj("refDataType", dataType, dataType.getClass.getName) nullSafeCodeGen(ctx, ev, eval => { val javaBoxedType = CodeGenerator.boxedType(dataType) -val javaType = CodeGenerator.javaType(dataType) -val originValue = ctx.freshName("origin") s""" - |$javaType $originValue = ($javaType)($eval); - |if ($originValue == $javaBoxedType.MIN_VALUE) { - | throw QueryExecutionErrors.unaryMinusCauseOverflowError($originValue); - |} - |${ev.value} = ($javaType)(-($originValue)); - """.stripMargin - }) -case IntegerType | LongType if failOnError => - val mathUtils = MathUtils.getClass.getCanonicalName.stripSuffix("$") - nullSafeCodeGen(ctx, ev, eval => { -s"${ev.value} = $mathUtils.negateExact($eval);" + |${ev.value} = ($javaBoxedType)$typeUtils.getNumeric( + | $refDataType, $failOnError).negate($eval); + """.stripMargin }) case dt: NumericType => nullSafeCodeGen(ctx, ev, eval => { val originValue = ctx.freshName("origin") @@ -181,23 +173,16 @@ case class Abs(child: Expression, failOnError: Boolean = SQLConf.get.ansiEnabled case _: DecimalType => defineCodeGen(ctx, ev, c => s"$c.abs()") -case ByteType | ShortType if failOnError => - val javaBoxedType = CodeGenerator.boxedTy
(spark) branch master updated: [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 25d96f7bacb4 [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl 25d96f7bacb4 is described below commit 25d96f7bacb43a7d5a835454ecc075e40d4f3c93 Author: Eric Marnadi AuthorDate: Fri Feb 2 22:32:42 2024 +0900 [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl ### What changes were proposed in this pull request? Adding the `deleteIfExists` method to the `StatefulProcessorHandle` in order to remove state variables from the State Store. Implemented only for RocksDBStateStoreProvider, as we do not currently support multiple column families for HDFS. ### Why are the changes needed? This functionality is needed to so users can remove state from the state store from the StatefulProcessorHandleImpl ### Does this PR introduce _any_ user-facing change? Yes - this functionality (removing column families) was previously not supported from our RocksDB client. ### How was this patch tested? Added a unit test that creates two streams with the same checkpoint directory. The second stream removes state that was created in the first stream upon initialization. We ensure that the state from the previous stream isn't kept. ### Was this patch authored or co-authored using generative AI tooling? Closes #44903 from ericm-db/deleteIfExists. Authored-by: Eric Marnadi Signed-off-by: Jungtaek Lim --- .../src/main/resources/error/error-classes.json| 11 +++ ...r-conditions-unsupported-feature-error-class.md | 4 + docs/sql-error-conditions.md | 6 ++ .../sql/streaming/StatefulProcessorHandle.scala| 6 ++ .../streaming/StatefulProcessorHandleImpl.scala| 12 +++ .../state/HDFSBackedStateStoreProvider.scala | 6 ++ .../sql/execution/streaming/state/RocksDB.scala| 16 .../state/RocksDBStateStoreProvider.scala | 5 + .../sql/execution/streaming/state/StateStore.scala | 6 ++ .../streaming/state/StateStoreErrors.scala | 22 + .../streaming/state/MemoryStateStore.scala | 4 + .../sql/streaming/TransformWithStateSuite.scala| 104 + 12 files changed, 202 insertions(+) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index baefb05a7070..136825ab374d 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3241,6 +3241,12 @@ ], "sqlState" : "0A000" }, + "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : { +"message" : [ + "Failed to remove default column family with reserved name=." +], +"sqlState" : "42802" + }, "STATE_STORE_MULTIPLE_VALUES_PER_KEY" : { "message" : [ "Store does not support multiple values per key" @@ -3950,6 +3956,11 @@ "Creating multiple column families with is not supported." ] }, + "STATE_STORE_REMOVING_COLUMN_FAMILIES" : { +"message" : [ + "Removing column families with is not supported." +] + }, "TABLE_OPERATION" : { "message" : [ "Table does not support . Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"." diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md b/docs/sql-error-conditions-unsupported-feature-error-class.md index 1b12c4bfc1b3..8d42ecdce790 100644 --- a/docs/sql-error-conditions-unsupported-feature-error-class.md +++ b/docs/sql-error-conditions-unsupported-feature-error-class.md @@ -194,6 +194,10 @@ set PROPERTIES and DBPROPERTIES at the same time. Creating multiple column families with `` is not supported. +## STATE_STORE_REMOVING_COLUMN_FAMILIES + +Removing column families with `` is not supported. + ## TABLE_OPERATION Table `` does not support ``. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog". diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 3a2c4d261352..c704b1c10c46 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2025,6 +2025,12 @@ The SQL config `` cannot be found. Please verify that the config exists Star (*) is not allowed in a select list when GROUP BY an ordinal position is used. +### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html
(spark) branch master updated: [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6f87fe2f513d [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC 6f87fe2f513d is described below commit 6f87fe2f513d1b1a022f0d03b6c81d73d7cfb228 Author: Martin Grund AuthorDate: Fri Feb 2 08:49:06 2024 -0400 [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC ### What changes were proposed in this pull request? This patch caches the result of the `df.schema` call in the DataFrame to avoid the extra roundtrip to the Spark Connect service to retrieve the columns or the schema. Since the Dataframe is immutable, the schema will not change. ### Why are the changes needed? Performance / Stability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Closes #42499 from grundprinzip/SPARK-44815. Lead-authored-by: Martin Grund Co-authored-by: Herman van Hovell Co-authored-by: Martin Grund Signed-off-by: Herman van Hovell --- .../jvm/src/main/scala/org/apache/spark/sql/Dataset.scala | 13 - python/pyspark/sql/connect/dataframe.py | 12 ++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 949f53409386..9a42afebf8f2 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -244,7 +244,18 @@ class Dataset[T] private[sql] ( * @group basic * @since 3.4.0 */ - def schema: StructType = { + def schema: StructType = cachedSchema + + /** + * The cached schema. + * + * Schema caching is correct in most cases. Connect is lazy by nature. This means that we only + * resolve the plan when it is submitted for execution or analysis. We do not cache intermediate + * resolved plans. If the input (changes table, view redefinition, etc...) of the plan changes + * between the schema() call, and a subsequent action, the cached schema might be inconsistent + * with the end schema. + */ + private lazy val cachedSchema: StructType = { DataTypeProtoConverter .toCatalystType( sparkSession diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 6d37158142a6..4091016e0d59 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -120,6 +120,7 @@ class DataFrame: # Check whether _repr_html is supported or not, we use it to avoid calling RPC twice # by __repr__ and _repr_html_ while eager evaluation opens. self._support_repr_html = False +self._cached_schema: Optional[StructType] = None def __repr__(self) -> str: if not self._support_repr_html: @@ -1782,8 +1783,15 @@ class DataFrame: @property def schema(self) -> StructType: -query = self._plan.to_proto(self._session.client) -return self._session.client.schema(query) +# Schema caching is correct in most cases. Connect is lazy by nature. This means that +# we only resolve the plan when it is submitted for execution or analysis. We do not +# cache intermediate resolved plan. If the input (changes table, view redefinition, +# etc...) of the plan changes between the schema() call, and a subsequent action, the +# cached schema might be inconsistent with the end schema. +if self._cached_schema is None: +query = self._plan.to_proto(self._session.client) +self._cached_schema = self._session.client.schema(query) +return self._cached_schema schema.__doc__ = PySparkDataFrame.schema.__doc__ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in DynamicPruning
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 22e5938aefc7 [SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in DynamicPruning 22e5938aefc7 is described below commit 22e5938aefc784f50218a86e013e4c2247271072 Author: Thang Long VU AuthorDate: Fri Feb 2 19:55:34 2024 +0800 [SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in DynamicPruning ### What changes were proposed in this pull request? This PR extends `DynamicPruningSubquery` to support broadcasting of multiple filtering keys (instead of one as before). The majority of the PR is to simply generalise singularity to plurality. **Note:** We actually do not use the multiple filtering keys `DynamicPruningSubquery` in this PR, we are doing this to make supporting DPP Null Safe Equality or multiple Equality predicates easier in the future. In Null Safe Equality JOIN, the JOIN condition `a <=> b` is transformed to `Coalesce(key1, Literal(key1.dataType)) = Coalesce(key2, Literal(key2.dataType)) AND IsNull(key1) = IsNull(key2)`. In order to have the highest pruning efficiency, we broadcast the 2 keys `Coalesce(key, Literal(key.dataType))` and `IsNull(key)` and use them to prune the other side at the same time. Before, the `DynamicPruningSubquery` only has one broadcasting key and we only supports DPP for one `EqualTo` JOIN predicate, now we are extending the subquery to multiple broadcasting keys. Please note that DPP has not been supported for multiple JOIN predicates. Put it in another way, at the moment, we don't insert a DPP Filter for multiple JOIN predicates at the same time, only potentially insert a DPP Filter for a given Equality JOIN predicate. ### Why are the changes needed? To make supporting DPP Null Safe Equality or DPP multiple Equality predicates easier in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44988 from longvu-db/multiple-broadcast-filtering-keys. Authored-by: Thang Long VU Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/DynamicPruning.scala | 12 +-- .../expressions/DynamicPruningSubquerySuite.scala | 89 ++ .../execution/SubqueryAdaptiveBroadcastExec.scala | 2 +- .../sql/execution/SubqueryBroadcastExec.scala | 37 - .../PlanAdaptiveDynamicPruningFilters.scala| 8 +- .../adaptive/PlanAdaptiveSubqueries.scala | 4 +- .../dynamicpruning/PartitionPruning.scala | 15 ++-- .../dynamicpruning/PlanDynamicPruningFilters.scala | 9 ++- .../spark/sql/DynamicPartitionPruningSuite.scala | 2 +- 9 files changed, 138 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala index ec6925eaa984..cc24a982d5d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala @@ -37,13 +37,13 @@ trait DynamicPruning extends Predicate * beneficial and so it should be executed even if it cannot reuse the results of the * broadcast through ReuseExchange; otherwise, it will use the filter only if it * can reuse the results of the broadcast through ReuseExchange - * @param broadcastKeyIndex the index of the filtering key collected from the broadcast + * @param broadcastKeyIndices the indices of the filtering keys collected from the broadcast */ case class DynamicPruningSubquery( pruningKey: Expression, buildQuery: LogicalPlan, buildKeys: Seq[Expression], -broadcastKeyIndex: Int, +broadcastKeyIndices: Seq[Int], onlyInBroadcast: Boolean, exprId: ExprId = NamedExpression.newExprId, hint: Option[HintInfo] = None) @@ -67,10 +67,12 @@ case class DynamicPruningSubquery( buildQuery.resolved && buildKeys.nonEmpty && buildKeys.forall(_.resolved) && - broadcastKeyIndex >= 0 && - broadcastKeyIndex < buildKeys.size && + broadcastKeyIndices.forall(idx => idx >= 0 && idx < buildKeys.size) && buildKeys.forall(_.references.subsetOf(buildQuery.outputSet)) && - pruningKey.dataType == buildKeys(broadcastKeyIndex).dataType + // DynamicPruningSubquery should only have a single broadcasting key since + // there are no usage for multiple broadcasting keys at the moment. + broadcastKeyIndices.size == 1 && + child.dataTyp
(spark) branch master updated: [SPARK-46949][SQL] Support CHAR/VARCHAR through ResolveDefaultColumns
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 362a4d43159d [SPARK-46949][SQL] Support CHAR/VARCHAR through ResolveDefaultColumns 362a4d43159d is described below commit 362a4d43159d73ef2e4c4f9267b572046220680f Author: Kent Yao AuthorDate: Fri Feb 2 17:39:12 2024 +0800 [SPARK-46949][SQL] Support CHAR/VARCHAR through ResolveDefaultColumns ### What changes were proposed in this pull request? We have issues resolving column definitions with default values, i.e., `c CHAR(5) DEFAULT 'foo'`, `v VARCHAR(10) DEFAULT 'bar'`. The reason is that CAHR/VARCHAR types in schemas are not supplanted with STRING type to align with the value expression. This PR fixes these issues in `ResolveDefaultColumns`, which seems to only cover the v1 tables. When I applied some related tests to v2 tables, they had the same issues. But beyond that, there are some other front-loading needs to be addressed. In this case, I'd like to separate v2 from the v1 PR. ### Why are the changes needed? bugfix ``` spark-sql (default)> CREATE TABLE t( c CHAR(5) DEFAULT 'spark') USING parquet; [INVALID_DEFAULT_VALUE.DATA_TYPE] Failed to execute CREATE TABLE command because the destination table column `c` has a DEFAULT value 'spark', which requires "CHAR(5)" type, but the statement provided a value of incompatible "STRING" type. ``` ### Does this PR introduce _any_ user-facing change? no, bugfix ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #44991 from yaooqinn/SPARK-46949. Authored-by: Kent Yao Signed-off-by: Kent Yao --- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 26 +- .../spark/sql/ResolveDefaultColumnsSuite.scala | 41 ++ 2 files changed, 58 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index e782e017d1ef..da03de73557f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -42,7 +42,9 @@ import org.apache.spark.util.ArrayImplicits._ /** * This object contains fields to help process DEFAULT columns. */ -object ResolveDefaultColumns extends QueryErrorsBase with ResolveDefaultColumnsUtils { +object ResolveDefaultColumns extends QueryErrorsBase + with ResolveDefaultColumnsUtils + with SQLConfHelper { // Name of attributes representing explicit references to the value stored in the above // CURRENT_DEFAULT_COLUMN_METADATA. val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT" @@ -307,25 +309,26 @@ object ResolveDefaultColumns extends QueryErrorsBase with ResolveDefaultColumnsU statementType: String, colName: String, defaultSQL: String): Expression = { +val supplanted = CharVarcharUtils.replaceCharVarcharWithString(dataType) // Perform implicit coercion from the provided expression type to the required column type. -if (dataType == analyzed.dataType) { +val ret = if (supplanted == analyzed.dataType) { analyzed -} else if (Cast.canUpCast(analyzed.dataType, dataType)) { - Cast(analyzed, dataType) +} else if (Cast.canUpCast(analyzed.dataType, supplanted)) { + Cast(analyzed, supplanted) } else { // If the provided default value is a literal of a wider type than the target column, but the // literal value fits within the narrower type, just coerce it for convenience. Exclude // boolean/array/struct/map types from consideration for this type coercion to avoid // surprising behavior like interpreting "false" as integer zero. val result = if (analyzed.isInstanceOf[Literal] && -!Seq(dataType, analyzed.dataType).exists(_ match { +!Seq(supplanted, analyzed.dataType).exists(_ match { case _: BooleanType | _: ArrayType | _: StructType | _: MapType => true case _ => false })) { try { -