(spark) branch master updated: [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 63b97c6ad82a [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider 63b97c6ad82a is described below commit 63b97c6ad82afac71afcd64117346b6e0bda72bb Author: Anish Shrigondekar AuthorDate: Wed Feb 14 06:19:48 2024 +0900 [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider ### What changes were proposed in this pull request? Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider ### Why are the changes needed? This change allows us to specify encoder for key/values separately and avoid encoding additional bytes. Also, it allows us to set schemas/encoders for individual column families, which will be required for future changes related to transformWithState operator (listState/timer changes etc) We are refactoring a bit here given the upcoming changes. so we are proposing to split key and value encoders. Key encoders can be of 2 types: - with prefix scan - without prefix scan Value encoders can also eventually be of 2 types: - single value - multiple values (used for list state) And we now also allow setting schema and getting encoder for each column family. So after the change, we can potentially allow something like this: - col family 1 - with keySchema with prefix scan and valueSchema with single value and binary type - col family 2 - with keySchema without prefix scan and valueSchema with multiple values ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` [info] Run completed in 3 minutes, 5 seconds. [info] Total number of tests run: 286 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 286, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #45038 from anishshri-db/task/SPARK-46979. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../streaming/StatefulProcessorHandleImpl.scala| 1 - .../sql/execution/streaming/ValueStateImpl.scala | 20 ++-- .../state/HDFSBackedStateStoreProvider.scala | 6 +- .../streaming/state/RocksDBStateEncoder.scala | 106 +++-- .../state/RocksDBStateStoreProvider.scala | 58 --- .../sql/execution/streaming/state/StateStore.scala | 6 +- .../streaming/state/MemoryStateStore.scala | 7 +- .../streaming/state/RocksDBStateStoreSuite.scala | 24 + 8 files changed, 151 insertions(+), 77 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index fed18fc7e458..62c97d11c926 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -115,7 +115,6 @@ class StatefulProcessorHandleImpl( override def getValueState[T](stateName: String): ValueState[T] = { verify(currState == CREATED, s"Cannot create state variable with name=$stateName after " + "initialization is complete") -store.createColFamilyIfAbsent(stateName) val resultState = new ValueStateImpl[T](store, stateName, keyEncoder) resultState } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala index 11ae7f65b43d..c1d807144df6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.types._ * @param store - reference to the StateStore instance to be used for storing state * @param stateName - name of logical state partition * @param keyEnc - Spark SQL encoder for key - * @tparam K - data type of key * @tparam S - data type of object that will be stored */ class ValueStateImpl[S]( @@ -40,6 +39,16 @@ class ValueStateImpl[S]( stateName: String, keyExprEnc: ExpressionEncoder[Any]) extends ValueState[S] with Logging { + private val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + private val k
(spark) branch branch-3.5 updated: [SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new a8c62d3f9a8d [SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26 a8c62d3f9a8d is described below commit a8c62d3f9a8de22f92e0e0ca1a5770f373b0b142 Author: Dongjoon Hyun AuthorDate: Mon Feb 12 10:37:49 2024 -0800 [SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26 This PR aims to upgrade `aircompressor` to 1.26. `aircompressor` v1.26 has the following bug fixes. - [Fix out of bounds read/write in Snappy decompressor](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2) - [Fix ZstdOutputStream corruption on double close](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2) No. Pass the CIs. No. Closes #45084 from dongjoon-hyun/SPARK-47023. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 5 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 9ab51dfa011a..c76702cd0af0 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -4,7 +4,7 @@ JTransforms/3.1//JTransforms-3.1.jar RoaringBitmap/0.9.45//RoaringBitmap-0.9.45.jar ST4/4.0.4//ST4-4.0.4.jar activation/1.1.1//activation-1.1.1.jar -aircompressor/0.25//aircompressor-0.25.jar +aircompressor/0.26//aircompressor-0.26.jar algebra_2.12/2.0.1//algebra_2.12-2.0.1.jar aliyun-java-sdk-core/4.5.10//aliyun-java-sdk-core-4.5.10.jar aliyun-java-sdk-kms/2.11.0//aliyun-java-sdk-kms-2.11.0.jar diff --git a/pom.xml b/pom.xml index 52505e6e1200..5db3c78e00eb 100644 --- a/pom.xml +++ b/pom.xml @@ -2555,6 +2555,11 @@ + +io.airlift +aircompressor +0.26 + org.apache.orc orc-mapreduce - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 49bcde612c59 [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException` 49bcde612c59 is described below commit 49bcde612c598fcf3c76cbd91a3dbf11d1b7f1b2 Author: Max Gekk AuthorDate: Tue Feb 13 13:50:18 2024 +0300 [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException` ### What changes were proposed in this pull request? In the PR, I propose to use `checkError()` in tests of `sql` to check `SparkUnsupportedOperationException`, and its fields. ### Why are the changes needed? By checking `SparkUnsupportedOperationException` and its fields like error class and message parameters prevents replacing `SparkUnsupportedOperationException` back to `UnsupportedOperationException`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the modified test suites. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45082 from MaxGekk/intercept-UnsupportedOperationException-tests. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../test/scala/org/apache/spark/sql/RowTest.scala | 6 +- .../spark/sql/catalyst/ScalaReflectionSuite.scala | 23 +--- .../encoders/EncoderErrorMessageSuite.scala| 12 ++-- .../catalyst/encoders/ExpressionEncoderSuite.scala | 10 ++-- .../sql/catalyst/json/JacksonGeneratorSuite.scala | 6 +- .../spark/sql/connector/catalog/CatalogSuite.scala | 4 +- .../sql/util/CaseInsensitiveStringMapSuite.scala | 6 +- .../scala/org/apache/spark/sql/DatasetSuite.scala | 33 ++- .../spark/sql/ScalaReflectionRelationSuite.scala | 29 + .../sql/connector/DataSourceV2FunctionSuite.scala | 4 +- .../binaryfile/BinaryFileFormatSuite.scala | 16 ++--- .../datasources/v2/V2SessionCatalogSuite.scala | 11 ++-- .../streaming/CompactibleFileStreamLogSuite.scala | 11 ++-- .../sources/RatePerMicroBatchProviderSuite.scala | 20 --- .../sources/RateStreamProviderSuite.scala | 19 +++--- .../streaming/sources/TextSocketStreamSuite.scala | 12 ++-- .../spark/sql/streaming/GroupStateSuite.scala | 68 +++--- 17 files changed, 160 insertions(+), 130 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala index 840d80ffed13..985443773943 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala @@ -24,7 +24,7 @@ import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema} import org.apache.spark.sql.types._ @@ -45,10 +45,10 @@ class RowTest extends AnyFunSpec with Matchers { describe("Row (without schema)") { it("throws an exception when accessing by fieldName") { - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { noSchemaRow.fieldIndex("col1") } - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { noSchemaRow.getAs("col1") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index bbb62acd0250..daa8d12613f2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{typeTag, TypeTag} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.FooEnum.FooEnum import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue @@ -490,17 +490,22 @@ class ScalaReflectionSuite extends SparkFunSuite { } test("SPARK-29026: schemaFor for trait without companion object throws exception ") { -val e = intercept[UnsupportedOperationException] { - schemaFor[TraitProductWithoutCompanion] -} -assert(e.getMessage.contains("Unable