HeartSaVioR commented on code in PR #49304:
URL: https://github.com/apache/spark/pull/49304#discussion_r1957541951
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -678,7 +654,7 @@ object RocksDBStateStoreProvider {
// Version as a single byte that specifies the encoding of the row data in
RocksDB
val STATE_ENCODING_NUM_VERSION_BYTES = 1
val STATE_ENCODING_VERSION: Byte = 0
- val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2
Review Comment:
This should be here, rather than StateStore. I think it was put to the right
place.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -847,35 +828,47 @@ class RocksDBStateStoreChangeDataReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
- ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)],
+ ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder, Short)],
colFamilyNameOpt: Option[String] = None)
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec,
colFamilyNameOpt) {
override protected var changelogSuffix: String = "changelog"
- private def getColFamilyIdBytes: Option[Array[Byte]] = {
- if (colFamilyNameOpt.isDefined) {
- val colFamilyName = colFamilyNameOpt.get
- if (!keyValueEncoderMap.containsKey(colFamilyName)) {
- throw new IllegalStateException(
- s"Column family $colFamilyName not found in the key value encoder
map")
- }
- Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
- } else {
- None
- }
+ /**
+ * Encodes a virtual column family ID into a byte array suitable for RocksDB.
+ *
+ * This method creates a fixed-size byte array prefixed with the virtual
column family ID,
+ * which is used to partition data within RocksDB.
+ *
+ * @param virtualColFamilyId The column family identifier to encode
+ * @return A byte array containing the encoded column family ID
+ */
+ private def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] =
{
+ val encodedBytes = new
Array[Byte](StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+ Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET,
virtualColFamilyId)
+ encodedBytes
}
- private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+ private def getExtractedKey(data: Array[Byte]): Array[Byte] = {
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1814,18 +1663,17 @@ class NoPrefixKeyStateEncoder(
rowBytes.length
)
- encodeStateRowWithPrefix(dataWithVersion)
+ dataWithVersion
}
}
override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
if (!useColumnFamilies) {
- dataEncoder.decodeKey(decodeStateRowData(keyBytes))
+ dataEncoder.decodeKey(keyBytes)
Review Comment:
ditto if the above is turned out to be a bug.
If the above is a bug, this cannot read from existing checkpoint from prior
Spark versions.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -656,31 +813,75 @@ class RocksDB(
*
* @note This update is not committed to disk until commit() is called.
*/
- def merge(key: Array[Byte], value: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val oldValue = db.get(readOptions, key)
- if (oldValue == null) {
- numKeysOnWritingVersion += 1
+ def merge(
+ key: Array[Byte],
+ value: Array[Byte],
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue == null) {
+ val cfInfo = getColumnFamilyInfo(cfName)
+ if (cfInfo.isInternal) {
+ numInternalKeysOnWritingVersion += 1
+ } else {
+ numKeysOnWritingVersion += 1
+ }
+ }
+ }
+ } else {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue == null) {
+ numKeysOnWritingVersion += 1
+ }
}
}
- db.merge(writeOptions, key, value)
- changelogWriter.foreach(_.merge(key, value))
+ db.merge(writeOptions, keyWithPrefix, value)
+ changelogWriter.foreach(_.merge(keyWithPrefix, value))
}
/**
* Remove the key if present.
* @note This update is not committed to disk until commit() is called.
*/
- def remove(key: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val value = db.get(readOptions, key)
- if (value != null) {
- numKeysOnWritingVersion -= 1
+ def remove(key: Array[Byte], cfName: String =
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
Review Comment:
nit: the extracted method in above can handle both directions (when
putting/removing) with a flag. I'd be OK to have two different extracted
methods if it is clearer.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -918,18 +968,34 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
// Check that snapshots and changelogs get purged correctly.
db.doMaintenance()
- assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+
+ if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) === Seq(31, 60, 60))
+ } else {
+ assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+ }
if (enableStateStoreCheckpointIds) {
// recommit version 60 creates another changelog file with different
unique id
- assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
+ if (colFamiliesEnabled) {
+ assert(changelogVersionsPresent(remoteDir) === (31 to 60) :+ 60)
+ } else {
+ assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
+ }
} else {
assert(changelogVersionsPresent(remoteDir) === (30 to 60))
}
// Verify the content of retained versions.
- for (version <- 30 to 60) {
- db.load(version, versionToUniqueId.get(version), readOnly = true)
- assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
+ if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
+ for (version <- 31 to 60) {
Review Comment:
nit: please leave a comment why version 30 does not work, and what is the
state value in version 30 in this case. I'm not sure why there is a difference,
from reading the test code.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -847,35 +828,47 @@ class RocksDBStateStoreChangeDataReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
- ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)],
+ ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder, Short)],
colFamilyNameOpt: Option[String] = None)
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec,
colFamilyNameOpt) {
override protected var changelogSuffix: String = "changelog"
- private def getColFamilyIdBytes: Option[Array[Byte]] = {
- if (colFamilyNameOpt.isDefined) {
- val colFamilyName = colFamilyNameOpt.get
- if (!keyValueEncoderMap.containsKey(colFamilyName)) {
- throw new IllegalStateException(
- s"Column family $colFamilyName not found in the key value encoder
map")
- }
- Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
- } else {
- None
- }
+ /**
+ * Encodes a virtual column family ID into a byte array suitable for RocksDB.
+ *
+ * This method creates a fixed-size byte array prefixed with the virtual
column family ID,
+ * which is used to partition data within RocksDB.
+ *
+ * @param virtualColFamilyId The column family identifier to encode
+ * @return A byte array containing the encoded column family ID
+ */
+ private def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] =
{
Review Comment:
We are now having two different places to handle cfId, here and RocksDB.
Shall we consolidate, or extract out to new class/object if there is no good
way to consolidate?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1834,7 +1834,6 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
store = provider.getRocksDBStateStore(2)
store.createColFamilyIfAbsent(colFamily3, keySchema, valueSchema,
NoPrefixKeyStateEncoderSpec(keySchema))
- assert(store.getColumnFamilyId(colFamily3) == 3)
Review Comment:
I'm not sure this test is still valid after you removed the check for column
family ID. What's rationale about removing this?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1791,16 +1644,12 @@ class RangeKeyScanStateEncoder(
class NoPrefixKeyStateEncoder(
dataEncoder: RocksDBDataEncoder,
keySchema: StructType,
- useColumnFamilies: Boolean = false,
- columnFamilyInfo: Option[ColumnFamilyInfo] = None)
- extends StateRowPrefixEncoder(
- useColumnFamilies,
- columnFamilyInfo
- ) with RocksDBKeyStateEncoder with Logging {
+ useColumnFamilies: Boolean = false)
+ extends RocksDBKeyStateEncoder with Logging {
override def encodeKey(row: UnsafeRow): Array[Byte] = {
if (!useColumnFamilies) {
Review Comment:
I was in doubt why useColumnFamilies is needed for KeyStateEncoder while we
move virtual column family out from encoder... and found this possible bug.
My understanding is, in else statement (useColumnFamilies = true), we write
version byte "twice", because `dataEncoder.encodeKey(row)` put version byte
already. If this is a bug and you just need to call
`dataEncoder.encodeKey(row)`, `useColumnFamilies` isn't used for any
implementation of KeyStateEncoder and you can remove it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -467,29 +492,62 @@ class RocksDB(
this
}
+ /**
+ * Function to check if col family is internal or not based on information
recorded in
+ * checkpoint metadata.
+ * @param cfName - column family name
+ * @param metadata - checkpoint metadata
+ * @return - type of column family (internal or otherwise)
+ */
+ private def checkColFamilyType(
Review Comment:
nit: since this is returning Boolean, it'd be ideal to name it as isXXX so
that people can understand which true will map to and for false. Just 2 cents
and OK to leave it as it is if the name is not clear in this way.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -918,18 +968,34 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
// Check that snapshots and changelogs get purged correctly.
db.doMaintenance()
- assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+
+ if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
Review Comment:
nit: isn't it duplicated with the next if statement?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -629,7 +627,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) === (1 to 50))
- assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50,
5))
+ if (colFamiliesEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) ===
+ Seq(1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50))
Review Comment:
Wait, we should have set the flag on enforcing snapshot for change of column
family before this fix. Does this mean we had a bug?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -621,28 +700,106 @@ class RocksDB(
}
}
+ /**
+ * Function to encode state row with virtual col family id prefix
+ * @param data - passed byte array to be stored in state store
+ * @param cfName - name of column family
+ * @return - encoded byte array with virtual column family id prefix
+ */
+ private def encodeStateRowWithPrefix(
+ data: Array[Byte],
+ cfName: String): Array[Byte] = {
+ // Create result array big enough for all prefixes plus data
+ val result = new Array[Byte](StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES +
data.length)
+ val offset = Platform.BYTE_ARRAY_OFFSET +
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES
+
+ val cfInfo = getColumnFamilyInfo(cfName)
+ Platform.putShort(result, Platform.BYTE_ARRAY_OFFSET, cfInfo.cfId)
+
+ // Write the actual data
+ Platform.copyMemory(
+ data, Platform.BYTE_ARRAY_OFFSET,
+ result, offset,
+ data.length
+ )
+
+ result
+ }
+
+ /**
+ * Function to decode state row with virtual col family id prefix
+ * @param data - passed byte array retrieved from state store
+ * @return - pair of decoded byte array without virtual column family id
prefix
+ * and name of column family
+ */
+ private def decodeStateRowWithPrefix(data: Array[Byte]): (Array[Byte],
String) = {
+ val cfId = Platform.getShort(data, Platform.BYTE_ARRAY_OFFSET)
+ val cfName = getColumnFamilyNameForId(cfId)
+ val offset = Platform.BYTE_ARRAY_OFFSET +
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES
+
+ val key = new Array[Byte](data.length -
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+ Platform.copyMemory(
+ data, offset,
+ key, Platform.BYTE_ARRAY_OFFSET,
+ key.length
+ )
+
+ (key, cfName)
+ }
+
/**
* Get the value for the given key if present, or null.
* @note This will return the last written value even if it was uncommitted.
*/
- def get(key: Array[Byte]): Array[Byte] = {
- db.get(readOptions, key)
+ def get(
+ key: Array[Byte],
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ db.get(readOptions, keyWithPrefix)
}
/**
* Put the given value for the given key.
* @note This update is not committed to disk until commit() is called.
*/
- def put(key: Array[Byte], value: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val oldValue = db.get(readOptions, key)
- if (oldValue == null) {
- numKeysOnWritingVersion += 1
+ def put(
+ key: Array[Byte],
+ value: Array[Byte],
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
Review Comment:
nit: This is same as merge, extract?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -178,55 +183,77 @@ class RocksDB(
// This is accessed and updated only between load and commit
// which means it is implicitly guarded by acquireLock
@GuardedBy("acquireLock")
- private val colFamilyNameToIdMap = new ConcurrentHashMap[String, Short]()
+ private val colFamilyNameToInfoMap = new ConcurrentHashMap[String,
ColumnFamilyInfo]()
+
+ @GuardedBy("acquireLock")
+ private val colFamilyIdToNameMap = new ConcurrentHashMap[Short, String]()
@GuardedBy("acquireLock")
private val maxColumnFamilyId: AtomicInteger = new AtomicInteger(-1)
@GuardedBy("acquireLock")
private val shouldForceSnapshot: AtomicBoolean = new AtomicBoolean(false)
- /**
- * Check whether the column family name is for internal column families.
- *
- * @param cfName - column family name
- * @return - true if the column family is for internal use, false otherwise
- */
- private def checkInternalColumnFamilies(cfName: String): Boolean =
cfName.charAt(0) == '_'
+ private def getColumnFamilyInfo(cfName: String): ColumnFamilyInfo = {
+ colFamilyNameToInfoMap.get(cfName)
+ }
+
+ private def getColumnFamilyNameForId(cfId: Short): String = {
+ colFamilyIdToNameMap.get(cfId)
+ }
- // Methods to fetch column family mapping for this State Store version
- def getColumnFamilyMapping: Map[String, Short] = {
- colFamilyNameToIdMap.asScala
+ private def addToColFamilyMaps(cfName: String, cfId: Short, isInternal:
Boolean): Unit = {
+ colFamilyNameToInfoMap.putIfAbsent(cfName, ColumnFamilyInfo(cfId,
isInternal))
+ colFamilyIdToNameMap.putIfAbsent(cfId, cfName)
+ }
+
+ private def removeFromColFamilyMaps(cfName: String): Unit = {
+ val colFamilyInfo = colFamilyNameToInfoMap.get(cfName)
+ if (colFamilyInfo != null) {
+ colFamilyNameToInfoMap.remove(cfName)
+ colFamilyIdToNameMap.remove(colFamilyInfo.cfId)
+ }
}
- def getColumnFamilyId(cfName: String): Short = {
- colFamilyNameToIdMap.get(cfName)
+ private def clearColFamilyMaps(): Unit = {
+ colFamilyNameToInfoMap.clear()
+ colFamilyIdToNameMap.clear()
}
/**
- * Create RocksDB column family, if not created already
+ * Check if the column family exists with given name and create one if it
doesn't. Users can
+ * create external column families storing user facing data as well as
internal column families
+ * such as secondary indexes. Metrics for both of these types are tracked
separately.
+ *
+ * @param colFamilyName - column family name
+ * @param isInternal - whether the column family is for internal use or not
+ * @return - virtual column family id
*/
- def createColFamilyIfAbsent(colFamilyName: String): Short = {
+ def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean):
Short = {
if (!checkColFamilyExists(colFamilyName)) {
val newColumnFamilyId = maxColumnFamilyId.incrementAndGet().toShort
- colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId)
+ addToColFamilyMaps(colFamilyName, newColumnFamilyId, isInternal)
shouldForceSnapshot.set(true)
newColumnFamilyId
} else {
- colFamilyNameToIdMap.get(colFamilyName)
+ colFamilyNameToInfoMap.get(colFamilyName).cfId
}
}
/**
* Remove RocksDB column family, if exists
* @return columnFamilyId if it exists, else None
*/
- def removeColFamilyIfExists(colFamilyName: String): Option[Short] = {
+ def removeColFamilyIfExists(colFamilyName: String): Boolean = {
if (checkColFamilyExists(colFamilyName)) {
shouldForceSnapshot.set(true)
- Some(colFamilyNameToIdMap.remove(colFamilyName))
+ prefixScan(Array.empty[Byte], colFamilyName).foreach { kv =>
Review Comment:
Likewise I commented in above, `iterator(colFamilyName)` would be lot easier
to understand (higher level of operation).
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -381,9 +381,7 @@ class RocksDBStateEncoderSuite extends SparkFunSuite {
keyStateEncoderSpec,
valueSchema,
Some(testProvider),
- Some(ColumnFamilyInfo(StateStore.DEFAULT_COL_FAMILY_NAME, 0))
- )
- new AvroStateEncoder(keyStateEncoderSpec, valueSchema, None, None)
Review Comment:
Nice finding!
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -629,7 +627,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) === (1 to 50))
- assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50,
5))
+ if (colFamiliesEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) ===
+ Seq(1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50))
Review Comment:
nit: Probably easier to see the diff with `Seq(1) ++ Range.inclusive(5, 10,
5)`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -700,7 +901,13 @@ class RocksDB(
new NextIterator[ByteArrayPair] {
override protected def getNext(): ByteArrayPair = {
if (iter.isValid) {
- byteArrayPair.set(iter.key, iter.value)
+ val key = if (useColumnFamilies) {
+ decodeStateRowWithPrefix(iter.key)._1
Review Comment:
This concerns me a lot, as not giving the column family ID/name back to the
caller makes the outcome be useless for multiple column family case. Say,
column family cfA and cfB somehow have the same key keyA. This doesn't even
give any meaning to global scan.
That's why I commented like the above.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1074,14 +1140,25 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.load(version, versionToUniqueId.get(version))
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
+
for (version <- 31 to 60) {
db.load(version - 1, versionToUniqueId.get(version - 1))
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
}
assert(changelogVersionsPresent(remoteDir) === (1 to 30))
- assert(snapshotVersionsPresent(remoteDir) === (31 to 60))
+
+ var result: Seq[Long] = if (colFamiliesEnabled) {
+ Seq(1)
+ } else {
+ Seq.empty
+ }
+
+ (31 to 60).foreach { i =>
Review Comment:
nit: doesn't this work? `result ++ (31 to 60)`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -902,7 +946,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.remove((version - 1).toString)
db.commit()
}
- assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+
+ if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 30) :+ 30 :+ 31)
Review Comment:
Is this because 30 is executed two times and snapshot does not overwrite in
checkpoint v2?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -702,6 +696,8 @@ object StateStore extends Logging {
val DEFAULT_COL_FAMILY_NAME = "default"
+ val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2
Review Comment:
This is bound to only of StateStore "implementation"
(RocksDBStateStoreProvider), not used in StateStore interface and the base
implementation.
Move this to RocksDBStateStoreProvider or some utility class which is
referenced by both RocksDB and RocksDBStateStoreProvider.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -656,31 +803,75 @@ class RocksDB(
*
* @note This update is not committed to disk until commit() is called.
*/
- def merge(key: Array[Byte], value: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val oldValue = db.get(readOptions, key)
- if (oldValue == null) {
- numKeysOnWritingVersion += 1
+ def merge(
+ key: Array[Byte],
+ value: Array[Byte],
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue == null) {
+ val cfInfo = getColumnFamilyInfo(cfName)
+ if (cfInfo.isInternal) {
+ numInternalKeysOnWritingVersion += 1
+ } else {
+ numKeysOnWritingVersion += 1
+ }
+ }
+ }
+ } else {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue == null) {
+ numKeysOnWritingVersion += 1
+ }
}
}
- db.merge(writeOptions, key, value)
- changelogWriter.foreach(_.merge(key, value))
+ db.merge(writeOptions, keyWithPrefix, value)
+ changelogWriter.foreach(_.merge(keyWithPrefix, value))
}
/**
* Remove the key if present.
* @note This update is not committed to disk until commit() is called.
*/
- def remove(key: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val value = db.get(readOptions, key)
- if (value != null) {
- numKeysOnWritingVersion -= 1
+ def remove(key: Array[Byte], cfName: String =
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue != null) {
+ val cfInfo = getColumnFamilyInfo(cfName)
+ if (cfInfo.isInternal) {
+ numInternalKeysOnWritingVersion -= 1
+ } else {
+ numKeysOnWritingVersion -= 1
+ }
+ }
+ }
+ } else {
+ if (conf.trackTotalNumberOfRows) {
+ val value = db.get(readOptions, keyWithPrefix)
+ if (value != null) {
+ numKeysOnWritingVersion -= 1
+ }
}
}
- db.delete(writeOptions, key)
- changelogWriter.foreach(_.delete(key))
+
+ db.delete(writeOptions, keyWithPrefix)
+ changelogWriter.foreach(_.delete(keyWithPrefix))
}
/**
Review Comment:
I'm saying that UX isn't changed quite differently for all other methods
except this. Most methods are changed to be aware of column family, while this
isn't.
For multiple column families case, we do not expect global scan for the
entire key space, so this method is mostly unused. They are now having to use
`prefixScan()` when they were using `iterator()`, which relies on
implementation details how we construct virtual column family. I'd rather say,
it'd be better to have column family aware version of `iterator()`.
I'd leave it to you to think which is better 1) having both `iterator()` and
`iterator(cfName: String)`, and leave `iterator()` to perform full scan
regardless of multiple column families, 2) only having `iterator(cfName:
String)`, and whether to full scan or just delegate to `prefixScan(cfName)`
based on internal flag (multiple column families).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]