anishshri-db commented on code in PR #47107:
URL: https://github.com/apache/spark/pull/47107#discussion_r1664948541
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -91,70 +96,106 @@ private[sql] class RocksDBStateStoreProvider
*/
override def valuesIterator(key: UnsafeRow, colFamilyName: String):
Iterator[UnsafeRow] = {
verify(key != null, "Key cannot be null")
+ ColumnFamilyUtils.verifyColFamilyOperations("valuesIterator",
colFamilyName)
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
val valueEncoder = kvEncoder._2
val keyEncoder = kvEncoder._1
verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator
requires a encoder " +
"that supports multiple values for a single key.")
- val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName)
- valueEncoder.decodeValues(encodedKey)
+
+ val encodedKey = keyEncoder.encodeKey(key,
Option(colFamilyNameToIdMap.get(colFamilyName)))
+ val encodedValues = rocksDB.get(encodedKey)
+ valueEncoder.decodeValues(encodedValues)
}
override def merge(key: UnsafeRow, value: UnsafeRow,
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
verify(state == UPDATING, "Cannot merge after already committed or
aborted")
+ ColumnFamilyUtils.verifyColFamilyOperations("merge", colFamilyName)
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
val keyEncoder = kvEncoder._1
val valueEncoder = kvEncoder._2
verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation
requires an encoder" +
" which supports multiple values for a single key")
verify(key != null, "Key cannot be null")
require(value != null, "Cannot merge a null value")
- rocksDB.merge(keyEncoder.encodeKey(key),
valueEncoder.encodeValue(value), colFamilyName)
+
+ val encodedKey = keyEncoder.encodeKey(key,
Option(colFamilyNameToIdMap.get(colFamilyName)))
+ rocksDB.merge(encodedKey, valueEncoder.encodeValue(value))
}
override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String):
Unit = {
verify(state == UPDATING, "Cannot put after already committed or
aborted")
verify(key != null, "Key cannot be null")
require(value != null, "Cannot put a null value")
+ ColumnFamilyUtils.verifyColFamilyOperations("put", colFamilyName)
+
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
- rocksDB.put(kvEncoder._1.encodeKey(key),
- kvEncoder._2.encodeValue(value), colFamilyName)
+ val encodedKey = kvEncoder._1.encodeKey(key,
Option(colFamilyNameToIdMap.get(colFamilyName)))
+ rocksDB.put(encodedKey, kvEncoder._2.encodeValue(value))
}
override def remove(key: UnsafeRow, colFamilyName: String): Unit = {
verify(state == UPDATING, "Cannot remove after already committed or
aborted")
verify(key != null, "Key cannot be null")
+ ColumnFamilyUtils.verifyColFamilyOperations("remove", colFamilyName)
+
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
- rocksDB.remove(kvEncoder._1.encodeKey(key), colFamilyName)
+ val encodedKey = kvEncoder._1.encodeKey(key,
Option(colFamilyNameToIdMap.get(colFamilyName)))
+ rocksDB.remove(encodedKey)
}
override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {
+ // Note this verify function only verify on the colFamilyName being
valid,
+ // we are actually doing prefix when useColumnFamilies,
+ // but pass "iterator" to throw correct error message
+ ColumnFamilyUtils.verifyColFamilyOperations("iterator", colFamilyName)
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
val rowPair = new UnsafeRowPair()
- rocksDB.iterator(colFamilyName).map { kv =>
- rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
- kvEncoder._2.decodeValue(kv.value))
- if (!isValidated && rowPair.value != null && !useColumnFamilies) {
- StateStoreProvider.validateStateRowFormat(
- rowPair.key, keySchema, rowPair.value, valueSchema, storeConf)
- isValidated = true
+
+ // As Virtual Column Family attach a column family prefix to the key row,
Review Comment:
nit: `attaches a`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]