HeartSaVioR commented on code in PR #43961:
URL: https://github.com/apache/spark/pull/43961#discussion_r1431945142


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -219,49 +233,127 @@ class RocksDB(
     loadedVersion = endVersion
   }
 
+  private def checkColFamilyExists(colFamilyName: String): Boolean = {
+    colFamilyNameToHandleMap.contains(colFamilyName)
+  }
+
+  /**
+   * Create RocksDB column family, if not created already
+   */
+  def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+      throw new UnsupportedOperationException("Failed to create column family 
with reserved " +
+        s"name=$colFamilyName")
+    }
+
+    if (!checkColFamilyExists(colFamilyName)) {
+      assert(db != null)
+      val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, 
columnFamilyOptions)
+      val handle = db.createColumnFamily(descriptor)
+      colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
+    }
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
    */
-  def get(key: Array[Byte]): Array[Byte] = {
-    db.get(readOptions, key)
+  def get(
+      key: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] 
= {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw new RuntimeException(s"Column family with name=$colFamilyName 
does not exist")
+      }
+      db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
+    } else {
+      db.get(readOptions, key)
+    }
   }
 
   /**
    * Put the given value for the given key.
    * @note This update is not committed to disk until commit() is called.
    */
-  def put(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def put(
+      key: Array[Byte],
+      value: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw new RuntimeException(s"Column family with name=$colFamilyName 
does not exist")
+      }
+
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
+      }
+      db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+      changelogWriter.foreach(_.put(key, value, colFamilyName))
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, key)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }

Review Comment:
   There seems to be various viable approaches to deduplicate code 1) consider 
adding default CF to colFamilyNameToHandleMap and do lookup 2) create private 
methods which receives "handle" and caller figures out the handle. 
   
   There are redundant lookup against the map already if trackTotalNumberOfRows 
is enabled - lookup result (handle) can be just stored as local val and lookup 
could just be done once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to