anishshri-db commented on code in PR #43961:
URL: https://github.com/apache/spark/pull/43961#discussion_r1439537332
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -219,49 +233,127 @@ class RocksDB(
loadedVersion = endVersion
}
+ private def checkColFamilyExists(colFamilyName: String): Boolean = {
+ colFamilyNameToHandleMap.contains(colFamilyName)
+ }
+
+ /**
+ * Create RocksDB column family, if not created already
+ */
+ def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+ if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+ throw new UnsupportedOperationException("Failed to create column family
with reserved " +
+ s"name=$colFamilyName")
+ }
+
+ if (!checkColFamilyExists(colFamilyName)) {
+ assert(db != null)
+ val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes,
columnFamilyOptions)
+ val handle = db.createColumnFamily(descriptor)
+ colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
+ }
+ }
+
/**
* Get the value for the given key if present, or null.
* @note This will return the last written value even if it was uncommitted.
*/
- def get(key: Array[Byte]): Array[Byte] = {
- db.get(readOptions, key)
+ def get(
+ key: Array[Byte],
+ colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte]
= {
+ if (useColumnFamilies) {
+ // if col family is not created, throw an exception
+ if (!checkColFamilyExists(colFamilyName)) {
+ throw new RuntimeException(s"Column family with name=$colFamilyName
does not exist")
+ }
+ db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
+ } else {
+ db.get(readOptions, key)
+ }
}
/**
* Put the given value for the given key.
* @note This update is not committed to disk until commit() is called.
*/
- def put(key: Array[Byte], value: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val oldValue = db.get(readOptions, key)
- if (oldValue == null) {
- numKeysOnWritingVersion += 1
+ def put(
+ key: Array[Byte],
+ value: Array[Byte],
+ colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ if (useColumnFamilies) {
+ // if col family is not created, throw an exception
+ if (!checkColFamilyExists(colFamilyName)) {
+ throw new RuntimeException(s"Column family with name=$colFamilyName
does not exist")
+ }
+
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName),
readOptions, key)
+ if (oldValue == null) {
+ numKeysOnWritingVersion += 1
+ }
+ }
+ db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+ changelogWriter.foreach(_.put(key, value, colFamilyName))
+ } else {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = db.get(readOptions, key)
+ if (oldValue == null) {
+ numKeysOnWritingVersion += 1
+ }
}
+ db.put(writeOptions, key, value)
+ changelogWriter.foreach(_.put(key, value))
}
- db.put(writeOptions, key, value)
- changelogWriter.foreach(_.put(key, value))
}
/**
* Remove the key if present.
* @note This update is not committed to disk until commit() is called.
*/
- def remove(key: Array[Byte]): Unit = {
- if (conf.trackTotalNumberOfRows) {
- val value = db.get(readOptions, key)
- if (value != null) {
- numKeysOnWritingVersion -= 1
+ def remove(
+ key: Array[Byte],
+ colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ if (useColumnFamilies) {
+ // if col family is not created, throw an exception
+ if (!checkColFamilyExists(colFamilyName)) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]