HeartSaVioR commented on code in PR #47778:
URL: https://github.com/apache/spark/pull/47778#discussion_r1721346399
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -126,14 +126,16 @@ trait StateStore extends ReadStateStore {
/**
* Create column family with given name, if absent.
+ *
+ * @return column family ID
*/
def createColFamilyIfAbsent(
colFamilyName: String,
keySchema: StructType,
valueSchema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
useMultipleValuesPerKey: Boolean = false,
- isInternal: Boolean = false): Unit
+ isInternal: Boolean = false): Short
Review Comment:
Can we retain the concept of VCF into RocksDB and do not expose this to the
outside of state store? I asked to not break the layer - VCF must be bound to
the detail of state store provider.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -447,9 +440,9 @@ private[sql] class RocksDBStateStoreProvider
private val keyValueEncoderMap = new
java.util.concurrent.ConcurrentHashMap[String,
(RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]
- private val colFamilyNameToIdMap = new
java.util.concurrent.ConcurrentHashMap[String, Short]
- // TODO SPARK-48796 load column family id from state schema when restarting
- private val colFamilyId = new AtomicInteger(0)
+
Review Comment:
nit: unnecessary empty line
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -606,6 +608,8 @@ object StateStore extends Logging {
val DEFAULT_COL_FAMILY_NAME = "default"
+ val DEFAULT_COL_FAMILY_ID: Short = 0
Review Comment:
This is not needed if we don't expose this. Again, caller should be able to
just deal with column family name.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -447,9 +440,9 @@ private[sql] class RocksDBStateStoreProvider
private val keyValueEncoderMap = new
java.util.concurrent.ConcurrentHashMap[String,
(RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]
- private val colFamilyNameToIdMap = new
java.util.concurrent.ConcurrentHashMap[String, Short]
- // TODO SPARK-48796 load column family id from state schema when restarting
- private val colFamilyId = new AtomicInteger(0)
+
+ private val multColFamiliesDisabledStr = "multiple column families is
disabled in " +
Review Comment:
nit: mult`i`ColFamiliesDisabledStr
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -166,6 +170,83 @@ class RocksDB(
@GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+ // This is accessed and updated only between load and acquire
Review Comment:
nit: personally, code comment makes me more confused. The comment says
"between load and acquire", what is acquire here? Do you mean by "release"?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1090,6 +1098,45 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+
+ test("verify that column family id is assigned correctly after removal") {
+ tryWithProviderResource(newStoreProvider(useColumnFamilies = true)) {
provider =>
+ var store = provider.getStore(0)
+ val colFamily1: String = "abc"
+ val colFamily2: String = "def"
+ val colFamily3: String = "ghi"
+ val colFamily4: String = "jkl"
+ val colFamily5: String = "mno"
+ store.createColFamilyIfAbsent(colFamily1, keySchema, valueSchema,
+ NoPrefixKeyStateEncoderSpec(keySchema))
+ store.createColFamilyIfAbsent(colFamily2, keySchema, valueSchema,
+ NoPrefixKeyStateEncoderSpec(keySchema))
+ store.commit()
+
+ store = provider.getStore(1)
+ store.removeColFamilyIfExists(colFamily2)
+ store.commit()
+
+ store = provider.getStore(2)
+ assert(store.createColFamilyIfAbsent(colFamily3, keySchema, valueSchema,
Review Comment:
Shall we expose test scope method to retrieve association between column
family name and column family ID instead? Should be only needed in RocksDB
State Store.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -166,6 +170,83 @@ class RocksDB(
@GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+ // This is accessed and updated only between load and acquire
+ // which means it is implicitly guarded by acquireLock
+ @GuardedBy("acquireLock")
+ private val colFamilyNameToIdMap = new ConcurrentHashMap[String, Short]()
+
+ @GuardedBy("acquireLock")
+ private val defaultColumnFamilyIdMapping =
Review Comment:
This does not need to be bound to StateStore - Column Family is a concept of
StateStore but VCF shouldn't be exposed outside of RocksDB state store
provider. COLUMN_FAMILY_`ID` should be bound to RocksDB.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -550,6 +641,12 @@ class RocksDB(
} finally {
changelogWriter = None
}
+ // If we have changed the columnFamilyId mapping, we have set a new
Review Comment:
Shall we cancel the changelog writer when we upload snapshot instead of
writing both? It's still not harmful if we succeed to commit changelog but fail
to upload snapshot assuming that the batch will be marked as failed, but it's
still redundant to commit changelog.
Also, the changelog and the snapshot is not exactly the same (not just a
pure replacement), so if there are both changelog and snapshot for the same
version, it is giving more confusion, and in worst case, you lost snapshot file
by any reason and the query will take changelog and lose the information of
column family update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]