(spark) branch master updated: [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider

2024-02-13 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 63b97c6ad82a [SPARK-46979][SS] Add support for specifying key and 
value encoder separately and also for each col family in RocksDB state store 
provider
63b97c6ad82a is described below

commit 63b97c6ad82afac71afcd64117346b6e0bda72bb
Author: Anish Shrigondekar 
AuthorDate: Wed Feb 14 06:19:48 2024 +0900

[SPARK-46979][SS] Add support for specifying key and value encoder 
separately and also for each col family in RocksDB state store provider

### What changes were proposed in this pull request?
Add support for specifying key and value encoder separately and also for 
each col family in RocksDB state store provider

### Why are the changes needed?
This change allows us to specify encoder for key/values separately and 
avoid encoding additional bytes. Also, it allows us to set schemas/encoders for 
individual column families, which will be required for future changes related 
to transformWithState operator (listState/timer changes etc)

We are refactoring a bit here given the upcoming changes. so we are 
proposing to split key and value encoders.
Key encoders can be of 2 types:
- with prefix scan
- without prefix scan

Value encoders can also eventually be of 2 types:
- single value
- multiple values (used for list state)

And we now also allow setting schema and getting encoder for each column 
family.
So after the change, we can potentially allow something like this:
- col family 1 - with keySchema with prefix scan and valueSchema with 
single value and binary type
- col family 2 - with keySchema without prefix scan and valueSchema with 
multiple values

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests
```
[info] Run completed in 3 minutes, 5 seconds.
[info] Total number of tests run: 286
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 286, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45038 from anishshri-db/task/SPARK-46979.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../streaming/StatefulProcessorHandleImpl.scala|   1 -
 .../sql/execution/streaming/ValueStateImpl.scala   |  20 ++--
 .../state/HDFSBackedStateStoreProvider.scala   |   6 +-
 .../streaming/state/RocksDBStateEncoder.scala  | 106 +++--
 .../state/RocksDBStateStoreProvider.scala  |  58 ---
 .../sql/execution/streaming/state/StateStore.scala |   6 +-
 .../streaming/state/MemoryStateStore.scala |   7 +-
 .../streaming/state/RocksDBStateStoreSuite.scala   |  24 +
 8 files changed, 151 insertions(+), 77 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
index fed18fc7e458..62c97d11c926 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
@@ -115,7 +115,6 @@ class StatefulProcessorHandleImpl(
   override def getValueState[T](stateName: String): ValueState[T] = {
 verify(currState == CREATED, s"Cannot create state variable with 
name=$stateName after " +
   "initialization is complete")
-store.createColFamilyIfAbsent(stateName)
 val resultState = new ValueStateImpl[T](store, stateName, keyEncoder)
 resultState
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
index 11ae7f65b43d..c1d807144df6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.types._
  * @param store - reference to the StateStore instance to be used for storing 
state
  * @param stateName - name of logical state partition
  * @param keyEnc - Spark SQL encoder for key
- * @tparam K - data type of key
  * @tparam S - data type of object that will be stored
  */
 class ValueStateImpl[S](
@@ -40,6 +39,16 @@ class ValueStateImpl[S](
 stateName: String,
 keyExprEnc: ExpressionEncoder[Any]) extends ValueState[S] with Logging {
 
+  private val schemaForKeyRow: StructType = new StructType().add("key", 
BinaryType)
+  private val k

(spark) branch branch-3.5 updated: [SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26

2024-02-13 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new a8c62d3f9a8d [SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26
a8c62d3f9a8d is described below

commit a8c62d3f9a8de22f92e0e0ca1a5770f373b0b142
Author: Dongjoon Hyun 
AuthorDate: Mon Feb 12 10:37:49 2024 -0800

[SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26

This PR aims to upgrade `aircompressor` to 1.26.

`aircompressor` v1.26 has the following bug fixes.

- [Fix out of bounds read/write in Snappy 
decompressor](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2)
- [Fix ZstdOutputStream corruption on double 
close](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2)

No.

Pass the CIs.

No.

Closes #45084 from dongjoon-hyun/SPARK-47023.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +-
 pom.xml   | 5 +
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 9ab51dfa011a..c76702cd0af0 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -4,7 +4,7 @@ JTransforms/3.1//JTransforms-3.1.jar
 RoaringBitmap/0.9.45//RoaringBitmap-0.9.45.jar
 ST4/4.0.4//ST4-4.0.4.jar
 activation/1.1.1//activation-1.1.1.jar
-aircompressor/0.25//aircompressor-0.25.jar
+aircompressor/0.26//aircompressor-0.26.jar
 algebra_2.12/2.0.1//algebra_2.12-2.0.1.jar
 aliyun-java-sdk-core/4.5.10//aliyun-java-sdk-core-4.5.10.jar
 aliyun-java-sdk-kms/2.11.0//aliyun-java-sdk-kms-2.11.0.jar
diff --git a/pom.xml b/pom.xml
index 52505e6e1200..5db3c78e00eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2555,6 +2555,11 @@
   
 
   
+  
+io.airlift
+aircompressor
+0.26
+  
   
 org.apache.orc
 orc-mapreduce


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`

2024-02-13 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 49bcde612c59 [SPARK-47028][SQL][TESTS] Check 
`SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
49bcde612c59 is described below

commit 49bcde612c598fcf3c76cbd91a3dbf11d1b7f1b2
Author: Max Gekk 
AuthorDate: Tue Feb 13 13:50:18 2024 +0300

[SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` 
instead of `UnsupportedOperationException`

### What changes were proposed in this pull request?
In the PR, I propose to use `checkError()` in tests of `sql` to check 
`SparkUnsupportedOperationException`, and its fields.

### Why are the changes needed?
By checking `SparkUnsupportedOperationException` and its fields like error 
class and message parameters prevents replacing 
`SparkUnsupportedOperationException` back to `UnsupportedOperationException`.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suites.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45082 from MaxGekk/intercept-UnsupportedOperationException-tests.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../test/scala/org/apache/spark/sql/RowTest.scala  |  6 +-
 .../spark/sql/catalyst/ScalaReflectionSuite.scala  | 23 +---
 .../encoders/EncoderErrorMessageSuite.scala| 12 ++--
 .../catalyst/encoders/ExpressionEncoderSuite.scala | 10 ++--
 .../sql/catalyst/json/JacksonGeneratorSuite.scala  |  6 +-
 .../spark/sql/connector/catalog/CatalogSuite.scala |  4 +-
 .../sql/util/CaseInsensitiveStringMapSuite.scala   |  6 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 33 ++-
 .../spark/sql/ScalaReflectionRelationSuite.scala   | 29 +
 .../sql/connector/DataSourceV2FunctionSuite.scala  |  4 +-
 .../binaryfile/BinaryFileFormatSuite.scala | 16 ++---
 .../datasources/v2/V2SessionCatalogSuite.scala | 11 ++--
 .../streaming/CompactibleFileStreamLogSuite.scala  | 11 ++--
 .../sources/RatePerMicroBatchProviderSuite.scala   | 20 ---
 .../sources/RateStreamProviderSuite.scala  | 19 +++---
 .../streaming/sources/TextSocketStreamSuite.scala  | 12 ++--
 .../spark/sql/streaming/GroupStateSuite.scala  | 68 +++---
 17 files changed, 160 insertions(+), 130 deletions(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
index 840d80ffed13..985443773943 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
@@ -24,7 +24,7 @@ import org.scalatest.funspec.AnyFunSpec
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericRow, 
GenericRowWithSchema}
 import org.apache.spark.sql.types._
@@ -45,10 +45,10 @@ class RowTest extends AnyFunSpec with Matchers {
 
   describe("Row (without schema)") {
 it("throws an exception when accessing by fieldName") {
-  intercept[UnsupportedOperationException] {
+  intercept[SparkUnsupportedOperationException] {
 noSchemaRow.fieldIndex("col1")
   }
-  intercept[UnsupportedOperationException] {
+  intercept[SparkUnsupportedOperationException] {
 noSchemaRow.getAs("col1")
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index bbb62acd0250..daa8d12613f2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.{typeTag, TypeTag}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.FooEnum.FooEnum
 import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
@@ -490,17 +490,22 @@ class ScalaReflectionSuite extends SparkFunSuite {
   }
 
   test("SPARK-29026: schemaFor for trait without companion object throws 
exception ") {
-val e = intercept[UnsupportedOperationException] {
-  schemaFor[TraitProductWithoutCompanion]
-}
-assert(e.getMessage.contains("Unable