liviazhu-db commented on code in PR #50497:
URL: https://github.com/apache/spark/pull/50497#discussion_r2049379593
##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -43,14 +43,120 @@ private[sql] class RocksDBStateStoreProvider
with SupportsFineGrainedReplay {
import RocksDBStateStoreProvider._
- class RocksDBStateStore(lastVersion: Long) extends StateStore {
+ /**
+ * Implementation of a state store that uses RocksDB as the backing data
store.
+ *
+ * This store implements a state machine with the following states:
+ * - UPDATING: The store is being updated and has not yet been committed or
aborted
+ * - COMMITTED: Updates have been successfully committed
+ * - ABORTED: Updates have been aborted
+ *
+ * Operations are validated against the current state to ensure proper usage:
+ * - Get/put/remove/iterator operations are only allowed in UPDATING state
+ * - Commit is only allowed in UPDATING state
+ * - Abort is allowed in UPDATING or ABORTED state
+ * - Metrics retrieval is only allowed in COMMITTED or ABORTED state
+ *
+ * Each store instance is assigned a unique stamp when created, which is
used to
+ * verify that operations are performed by the owning thread and to prevent
+ * concurrent modifications to the same store.
Review Comment:
"concurrent modifications to the same store" -> "concurrent modifications to
the same StateStoreProvider/NativeRocksDB instance" (store is overloaded)
##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##
@@ -126,6 +126,7 @@ abstract class StatePartitionReaderBase(
stateStoreColFamilySchema.keyStateEncoderSpec.get,
useMultipleValuesPerKey = useMultipleValuesPerKey,
isInternal = isInternal)
+ store.abort()
Review Comment:
Why do we abort here? Can you leave a comment?
##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -444,17 +576,57 @@ private[sql] class RocksDBStateStoreProvider
override def stateStoreId: StateStoreId = stateStoreId_
- override def getStore(version: Long, uniqueId: Option[String] = None):
StateStore = {
+ private lazy val stateMachine: RocksDBStateStoreProviderStateMachine =
+new RocksDBStateStoreProviderStateMachine(stateStoreId,
RocksDBConf(storeConf))
+
+ /**
+ * Creates and returns a state store with the specified parameters.
+ *
+ * @param version The version of the state store to load
+ * @param uniqueId Optional unique identifier for checkpoint
+ * @param readOnly Whether to open the store in read-only mode
+ * @param existingStore Optional existing store to reuse instead of creating
a new one
+ * @return The loaded state store
+ */
+ private def loadStateStore(
+ version: Long,
+ uniqueId: Option[String],
+ readOnly: Boolean,
+ existingStore: Option[ReadStateStore] = None): StateStore = {
try {
if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
- rocksDB.load(
-version,
-stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds)
uniqueId else None)
- new RocksDBStateStore(version)
-}
-catch {
+
+ // Determine stamp - either use existing or acquire new
+ val stamp = existingStore.map(_.getReadStamp).getOrElse {
Review Comment:
I think we don't need this anymore (since we don't create a new store if
existingStore exists)
##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProviderStateMachine.scala:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.errors.QueryExecutionErrors
+
+/**
+ * A state machine that manages the lifecycle of RocksDB state store instances.
+ *
+ * T