This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 25d96f7bacb4 [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl 25d96f7bacb4 is described below commit 25d96f7bacb43a7d5a835454ecc075e40d4f3c93 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Fri Feb 2 22:32:42 2024 +0900 [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl ### What changes were proposed in this pull request? Adding the `deleteIfExists` method to the `StatefulProcessorHandle` in order to remove state variables from the State Store. Implemented only for RocksDBStateStoreProvider, as we do not currently support multiple column families for HDFS. ### Why are the changes needed? This functionality is needed to so users can remove state from the state store from the StatefulProcessorHandleImpl ### Does this PR introduce _any_ user-facing change? Yes - this functionality (removing column families) was previously not supported from our RocksDB client. ### How was this patch tested? Added a unit test that creates two streams with the same checkpoint directory. The second stream removes state that was created in the first stream upon initialization. We ensure that the state from the previous stream isn't kept. ### Was this patch authored or co-authored using generative AI tooling? Closes #44903 from ericm-db/deleteIfExists. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 11 +++ ...r-conditions-unsupported-feature-error-class.md | 4 + docs/sql-error-conditions.md | 6 ++ .../sql/streaming/StatefulProcessorHandle.scala | 6 ++ .../streaming/StatefulProcessorHandleImpl.scala | 12 +++ .../state/HDFSBackedStateStoreProvider.scala | 6 ++ .../sql/execution/streaming/state/RocksDB.scala | 16 ++++ .../state/RocksDBStateStoreProvider.scala | 5 + .../sql/execution/streaming/state/StateStore.scala | 6 ++ .../streaming/state/StateStoreErrors.scala | 22 +++++ .../streaming/state/MemoryStateStore.scala | 4 + .../sql/streaming/TransformWithStateSuite.scala | 104 +++++++++++++++++++++ 12 files changed, 202 insertions(+) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index baefb05a7070..136825ab374d 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3241,6 +3241,12 @@ ], "sqlState" : "0A000" }, + "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : { + "message" : [ + "Failed to remove default column family with reserved name=<colFamilyName>." + ], + "sqlState" : "42802" + }, "STATE_STORE_MULTIPLE_VALUES_PER_KEY" : { "message" : [ "Store does not support multiple values per key" @@ -3950,6 +3956,11 @@ "Creating multiple column families with <stateStoreProvider> is not supported." ] }, + "STATE_STORE_REMOVING_COLUMN_FAMILIES" : { + "message" : [ + "Removing column families with <stateStoreProvider> is not supported." + ] + }, "TABLE_OPERATION" : { "message" : [ "Table <tableName> does not support <operation>. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"." diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md b/docs/sql-error-conditions-unsupported-feature-error-class.md index 1b12c4bfc1b3..8d42ecdce790 100644 --- a/docs/sql-error-conditions-unsupported-feature-error-class.md +++ b/docs/sql-error-conditions-unsupported-feature-error-class.md @@ -194,6 +194,10 @@ set PROPERTIES and DBPROPERTIES at the same time. Creating multiple column families with `<stateStoreProvider>` is not supported. +## STATE_STORE_REMOVING_COLUMN_FAMILIES + +Removing column families with `<stateStoreProvider>` is not supported. + ## TABLE_OPERATION Table `<tableName>` does not support `<operation>`. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog". diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 3a2c4d261352..c704b1c10c46 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2025,6 +2025,12 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists Star (*) is not allowed in a select list when GROUP BY an ordinal position is used. +### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to remove default column family with reserved name=`<colFamilyName>`. + ### STATE_STORE_MULTIPLE_VALUES_PER_KEY [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala index 5eaccceb947c..738928b5cc36 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala @@ -40,4 +40,10 @@ private[sql] trait StatefulProcessorHandle extends Serializable { /** Function to return queryInfo for currently running task */ def getQueryInfo(): QueryInfo + + /** + * Function to delete and purge state variable if defined previously + * @param stateName - name of the state variable + */ + def deleteIfExists(stateName: String): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index d0cd8f7dc0a3..d06938ffeafb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -122,4 +122,16 @@ class StatefulProcessorHandleImpl( } override def getQueryInfo(): QueryInfo = currQueryInfo + + /** + * Function to delete and purge state variable if defined previously + * + * @param stateName - name of the state variable + */ + override def deleteIfExists(stateName: String): Unit = { + verify(currState == CREATED, s"Cannot delete state variable with name=$stateName after " + + "initialization is complete") + store.removeColFamilyIfExists(stateName) + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 842c4004820c..ffb618d0fbb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -201,6 +201,12 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with override def toString(): String = { s"HDFSStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]" } + + override def removeColFamilyIfExists(colFamilyName: String): Unit = { + throw StateStoreErrors.removingColumnFamiliesNotSupported( + "HDFSBackedStateStoreProvider") + + } } def getMetricsForProvider(): Map[String, Long] = synchronized { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index cf453394ba47..bf1a1c50d350 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -264,6 +264,22 @@ class RocksDB( } } + /** + * Remove RocksDB column family, if exists + */ + def removeColFamilyIfExists(colFamilyName: String): Unit = { + if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { + throw StateStoreErrors.cannotRemoveDefaultColumnFamily(colFamilyName) + } + + if (checkColFamilyExists(colFamilyName)) { + assert(db != null) + val handle = colFamilyNameToHandleMap(colFamilyName) + db.dropColumnFamily(handle) + colFamilyNameToHandleMap.remove(colFamilyName) + } + } + /** * Get the value for the given key if present, or null. * @note This will return the last written value even if it was uncommitted. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index d38e21aac181..e469fd4fe1c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -189,6 +189,11 @@ private[sql] class RocksDBStateStoreProvider /** Return the [[RocksDB]] instance in this store. This is exposed mainly for testing. */ def dbInstance(): RocksDB = rocksDB + + /** Remove column family if exists */ + override def removeColFamilyIfExists(colFamilyName: String): Unit = { + rocksDB.removeColFamilyIfExists(colFamilyName) + } } override def init( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index a8af14e82230..4b409b8a66b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -103,6 +103,12 @@ trait ReadStateStore { * double resource cleanup. */ trait StateStore extends ReadStateStore { + + /** + * Remove column family with given name, if present. + */ + def removeColFamilyIfExists(colFamilyName: String): Unit + /** * Create column family with given name, if absent. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 665dafc6f66a..bbc6d4c78f90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -37,6 +37,16 @@ object StateStoreErrors { new StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider) } + def removingColumnFamiliesNotSupported(stateStoreProvider: String): + StateStoreRemovingColumnFamiliesNotSupportedException = { + new StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider) + } + + def cannotRemoveDefaultColumnFamily(colFamilyName: String): + StateStoreCannotRemoveDefaultColumnFamily = { + new StateStoreCannotRemoveDefaultColumnFamily(colFamilyName) + } + def unsupportedOperationException(operationName: String, entity: String): StateStoreUnsupportedOperationException = { new StateStoreUnsupportedOperationException(operationName, entity) @@ -48,6 +58,18 @@ class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", messageParameters = Map("stateStoreProvider" -> stateStoreProvider) ) +class StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider: String) + extends SparkUnsupportedOperationException( + errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES", + messageParameters = Map("stateStoreProvider" -> stateStoreProvider) + ) + +class StateStoreCannotRemoveDefaultColumnFamily(colFamilyName: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY", + messageParameters = Map("colFamilyName" -> colFamilyName) + ) + class StateStoreUnsupportedOperationException(operationType: String, entity: String) extends SparkUnsupportedOperationException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala index 5229865122be..02052d307f41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala @@ -33,6 +33,10 @@ class MemoryStateStore extends StateStore() { throw StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider") } + override def removeColFamilyIfExists(colFamilyName: String): Unit = { + throw StateStoreErrors.removingColumnFamiliesNotSupported("MemoryStateStoreProvider") + } + override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = map.get(key) override def put(key: UnsafeRow, newValue: UnsafeRow, colFamilyName: String): Unit = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 70a71f745066..7a6c3f00fc7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -58,6 +58,73 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S override def close(): Unit = {} } +class RunningCountMostRecentStatefulProcessor + extends StatefulProcessor[String, (String, String), (String, String, String)] + with Logging { + @transient private var _countState: ValueState[Long] = _ + @transient private var _mostRecent: ValueState[String] = _ + @transient var _processorHandle: StatefulProcessorHandle = _ + + override def init( + handle: StatefulProcessorHandle, + outputMode: OutputMode) : Unit = { + _processorHandle = handle + assert(handle.getQueryInfo().getBatchId >= 0) + _countState = _processorHandle.getValueState[Long]("countState") + _mostRecent = _processorHandle.getValueState[String]("mostRecent") + } + + override def handleInputRows( + key: String, + inputRows: Iterator[(String, String)], + timerValues: TimerValues): Iterator[(String, String, String)] = { + val count = _countState.getOption().getOrElse(0L) + 1 + val mostRecent = _mostRecent.getOption().getOrElse("") + + var output = List[(String, String, String)]() + inputRows.foreach { row => + _mostRecent.update(row._2) + _countState.update(count) + output = (key, count.toString, mostRecent) :: output + } + output.iterator + } + + override def close(): Unit = {} +} + +class MostRecentStatefulProcessorWithDeletion + extends StatefulProcessor[String, (String, String), (String, String)] + with Logging { + @transient private var _mostRecent: ValueState[String] = _ + @transient var _processorHandle: StatefulProcessorHandle = _ + + override def init( + handle: StatefulProcessorHandle, + outputMode: OutputMode) : Unit = { + _processorHandle = handle + assert(handle.getQueryInfo().getBatchId >= 0) + _processorHandle.deleteIfExists("countState") + _mostRecent = _processorHandle.getValueState[String]("mostRecent") + } + + override def handleInputRows( + key: String, + inputRows: Iterator[(String, String)], + timerValues: TimerValues): Iterator[(String, String)] = { + val mostRecent = _mostRecent.getOption().getOrElse("") + + var output = List[(String, String)]() + inputRows.foreach { row => + _mostRecent.update(row._2) + output = (key, mostRecent) :: output + } + output.iterator + } + + override def close(): Unit = {} +} + class RunningCountStatefulProcessorWithError extends RunningCountStatefulProcessor { @transient private var _tempState: ValueState[Long] = _ @@ -129,6 +196,43 @@ class TransformWithStateSuite extends StateStoreMetricsTest ) } } + + test("transformWithState - test deleteIfExists operator") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { + withTempDir { chkptDir => + val dirPath = chkptDir.getCanonicalPath + val inputData = MemoryStream[(String, String)] + val stream1 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + val stream2 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new MostRecentStatefulProcessorWithDeletion(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + testStream(stream1, OutputMode.Update())( + StartStream(checkpointLocation = dirPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "1", "")), + StopStream + ) + testStream(stream2, OutputMode.Update())( + StartStream(checkpointLocation = dirPath), + AddData(inputData, ("a", "str2"), ("b", "str3")), + CheckNewAnswer(("a", "str1"), + ("b", "")), // should not factor in previous count state + StopStream + ) + } + } + } } class TransformWithStateValidationSuite extends StateStoreMetricsTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org