HeartSaVioR commented on code in PR #49304:
URL: https://github.com/apache/spark/pull/49304#discussion_r1926469488
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -724,20 +723,21 @@ class AvroStateEncoder(
keyStateEncoderSpec: KeyStateEncoderSpec,
valueSchema: StructType,
stateSchemaProvider: Option[StateSchemaProvider],
- columnFamilyInfo: Option[ColumnFamilyInfo]
+ columnFamilyName: Option[String]
Review Comment:
Why not just have `columnFamilyName: String` rather than asserting? Do we
create the instance of this class dynamically?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -702,6 +696,8 @@ object StateStore extends Logging {
val DEFAULT_COL_FAMILY_NAME = "default"
+ val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2
Review Comment:
I guess this is bound to RocksDB and don't need to be coupled with
StateStore.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -237,22 +272,22 @@ class RocksDB(
* @return - true if the column family exists, false otherwise
*/
def checkColFamilyExists(colFamilyName: String): Boolean = {
- colFamilyNameToIdMap.containsKey(colFamilyName)
+ db != null && colFamilyNameToInfoMap.containsKey(colFamilyName)
}
// This method sets the internal column family metadata to
// the default values it should be set to on load
private def setInitialCFInfo(): Unit = {
- colFamilyNameToIdMap.clear()
+ clearColFamilyMaps()
shouldForceSnapshot.set(false)
maxColumnFamilyId.set(0)
}
def getColFamilyCount(isInternal: Boolean): Long = {
if (isInternal) {
-
colFamilyNameToIdMap.asScala.keys.toSeq.count(checkInternalColumnFamilies)
+
colFamilyNameToInfoMap.asScala.keys.toSeq.count(checkInternalColumnFamilies)
} else {
-
colFamilyNameToIdMap.asScala.keys.toSeq.count(!checkInternalColumnFamilies(_))
+
colFamilyNameToInfoMap.asScala.keys.toSeq.count(!checkInternalColumnFamilies(_))
Review Comment:
Would this work to remove if statement?
```
colFamilyNameToInfoMap.asScala.keys.toSeq.count(checkInternalColumnFamilies(_)
== isInternal)
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -621,28 +690,106 @@ class RocksDB(
}
}
+ /**
+ * Function to encode state row with virtual col family id prefix
+ * @param data - passed byte array to be stored in state store
+ * @param cfName - name of column family
+ * @return - encoded byte array with virtual column family id prefix
+ */
+ private def encodeStateRowWithPrefix(
+ data: Array[Byte],
+ cfName: String): Array[Byte] = {
+ // Create result array big enough for all prefixes plus data
+ val result = new Array[Byte](StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES +
data.length)
+ val offset = Platform.BYTE_ARRAY_OFFSET +
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES
+
+ val cfInfo = getColumnFamilyInfo(cfName)
+ Platform.putShort(result, Platform.BYTE_ARRAY_OFFSET, cfInfo.cfId)
+
+ // Write the actual data
+ Platform.copyMemory(
+ data, Platform.BYTE_ARRAY_OFFSET,
+ result, offset,
+ data.length
+ )
+
+ result
+ }
+
+ /**
+ * Function to decode state row with virtual col family id prefix
+ * @param data - passed byte array retrieved from state store
+ * @return - pair of decoded byte array without virtual column family id
prefix
+ * and name of column family
+ */
+ private def decodeStateRowWithPrefix(data: Array[Byte]): (Array[Byte],
String) = {
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -931,7 +941,8 @@ case class RocksDBCheckpointMetadata(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long,
- columnFamilyMapping: Option[Map[String, Short]] = None,
+ numInternalKeys: Long,
+ columnFamilyMapping: Option[Map[String, ColumnFamilyInfo]] = None,
Review Comment:
This is a breaking change as the existing field gets different
representation for json. Shall we check whether we ever release the version
before the change, to ensure we are not breaking the checkpoint? Let's also
check with the latest bugfix version of Spark 3.5.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -237,22 +272,22 @@ class RocksDB(
* @return - true if the column family exists, false otherwise
*/
def checkColFamilyExists(colFamilyName: String): Boolean = {
- colFamilyNameToIdMap.containsKey(colFamilyName)
+ db != null && colFamilyNameToInfoMap.containsKey(colFamilyName)
Review Comment:
Mind explaining why we check for db instance?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -621,28 +690,106 @@ class RocksDB(
}
}
+ /**
+ * Function to encode state row with virtual col family id prefix
+ * @param data - passed byte array to be stored in state store
+ * @param cfName - name of column family
+ * @return - encoded byte array with virtual column family id prefix
+ */
+ private def encodeStateRowWithPrefix(
Review Comment:
Shall we revisit the method name? It's not very clear we will **prefix vcf
ID** to the **key row** from the method name.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -923,6 +924,15 @@ object RocksDBFileManagerMetrics {
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
}
+/**
+ * Case class to keep track of column family info within checkpoint metadata.
+ * @param cfId - virtual column family id
+ * @param isInternal - whether the column family is internal or not
+ */
+case class ColumnFamilyInfo(
+ cfId: Short,
+ isInternal: Boolean)
Review Comment:
It'd be ideal to make clear which is the source of truth - we seem to use
the naming convention (starting with `_`) to determine the internal column
family. Is it always reliable or we should look at the flag? If we decide the
source of truth for this, I'd say the other should not be used to determine
whether the cf is internal or not.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -656,31 +803,75 @@ class RocksDB(
*
* @note This update is not committed to disk until commit() is called.
*/
- def merge(key: Array[Byte], value: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val oldValue = db.get(readOptions, key)
- if (oldValue == null) {
- numKeysOnWritingVersion += 1
+ def merge(
+ key: Array[Byte],
+ value: Array[Byte],
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue == null) {
+ val cfInfo = getColumnFamilyInfo(cfName)
+ if (cfInfo.isInternal) {
+ numInternalKeysOnWritingVersion += 1
+ } else {
+ numKeysOnWritingVersion += 1
+ }
+ }
+ }
+ } else {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue == null) {
+ numKeysOnWritingVersion += 1
+ }
}
}
- db.merge(writeOptions, key, value)
- changelogWriter.foreach(_.merge(key, value))
+ db.merge(writeOptions, keyWithPrefix, value)
+ changelogWriter.foreach(_.merge(keyWithPrefix, value))
}
/**
* Remove the key if present.
* @note This update is not committed to disk until commit() is called.
*/
- def remove(key: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val value = db.get(readOptions, key)
- if (value != null) {
- numKeysOnWritingVersion -= 1
+ def remove(key: Array[Byte], cfName: String =
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ val keyWithPrefix = if (useColumnFamilies) {
+ encodeStateRowWithPrefix(key, cfName)
+ } else {
+ key
+ }
+
+ if (useColumnFamilies) {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, keyWithPrefix)
+ if (oldValue != null) {
+ val cfInfo = getColumnFamilyInfo(cfName)
+ if (cfInfo.isInternal) {
+ numInternalKeysOnWritingVersion -= 1
+ } else {
+ numKeysOnWritingVersion -= 1
+ }
+ }
+ }
+ } else {
+ if (conf.trackTotalNumberOfRows) {
+ val value = db.get(readOptions, keyWithPrefix)
+ if (value != null) {
+ numKeysOnWritingVersion -= 1
+ }
}
}
- db.delete(writeOptions, key)
- changelogWriter.foreach(_.delete(key))
+
+ db.delete(writeOptions, keyWithPrefix)
+ changelogWriter.foreach(_.delete(keyWithPrefix))
}
/**
Review Comment:
I'm not sure how we missed this (or we were doing prefix scan to cover this
indirectly)...
I don't think iterator() makes sense for multiple column families use case.
The resulting iterator won't give any information about column family, so it's
not even useful for global scan on entire space as well. (Different CFs can
have the same key.)
I think we have to make change on the iterator() to receive cfName to scan
only for certain cf. Please let me know if you think we can't.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -931,7 +941,8 @@ case class RocksDBCheckpointMetadata(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long,
- columnFamilyMapping: Option[Map[String, Short]] = None,
+ numInternalKeys: Long,
+ columnFamilyMapping: Option[Map[String, ColumnFamilyInfo]] = None,
Review Comment:
If we didn't have the test for compatibility then we probably should have it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -723,20 +920,43 @@ class RocksDB(
iter.seekToFirst()
var keys = 0L
- while (iter.isValid) {
- keys += 1
- iter.next()
+ var internalKeys = 0L
+
+ if (!useColumnFamilies) {
+ while (iter.isValid) {
+ keys += 1
+ iter.next()
+ }
+ } else {
+ while (iter.isValid) {
+ val (_, cfName) = decodeStateRowWithPrefix(iter.key)
+ val cfInfo = getColumnFamilyInfo(cfName)
Review Comment:
nit: We could do some micro optimization here - all keys in the same CF will
(and must) appear sequentially, so it is a waste of time to perform lookup from
the map for every key. We can have current cf with internal flag (or just
ColumnFamilyInfo) as variables, and leverage them.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -621,28 +690,106 @@ class RocksDB(
}
}
+ /**
+ * Function to encode state row with virtual col family id prefix
+ * @param data - passed byte array to be stored in state store
+ * @param cfName - name of column family
+ * @return - encoded byte array with virtual column family id prefix
+ */
+ private def encodeStateRowWithPrefix(
Review Comment:
Nvm if you just moved the existing method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]