anishshri-db commented on code in PR #47107:
URL: https://github.com/apache/spark/pull/47107#discussion_r1664950720
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -392,12 +466,135 @@ private[sql] class RocksDBStateStoreProvider
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
+
+ /**
+ * Class for column family related utility functions.
+ * Verification functions for column family names, column family operation
validations etc.
+ */
+ private object ColumnFamilyUtils {
+ private val multColFamiliesDisabledStr = "multiple column families is
disabled in " +
+ "RocksDBStateStoreProvider"
+
+ /**
+ * Function to get Byte Array for the input virtual column family id.
+ *
+ * @param id - id of the column family
+ */
+ def getVcfIdBytes(id: Short): Array[Byte] = {
+ val encodedBytes = new Array[Byte](VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+ Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, id)
+ encodedBytes
+ }
+
+ /**
+ * Function to verify invariants for column family based operations
+ * such as get, put, remove etc.
+ *
+ * @param operationName - name of the store operation
+ * @param colFamilyName - name of the column family
+ */
+ def verifyColFamilyOperations(
+ operationName: String,
+ colFamilyName: String): Unit = {
+ if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+ // if the state store instance does not support multiple column
families, throw an exception
+ if (!useColumnFamilies) {
+ throw StateStoreErrors.unsupportedOperationException(operationName,
+ multColFamiliesDisabledStr)
+ }
+
+ // if the column family name is empty or contains leading/trailing
whitespaces, throw an
+ // exception
+ if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
+ throw
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
+ }
+
+ // if the column family does not exist, throw an exception
+ if (!checkColFamilyExists(colFamilyName)) {
+ throw
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+ colFamilyName)
+ }
+ }
+ }
+
+ /**
+ * Function to verify invariants for column family creation or deletion
operations.
+ *
+ * @param operationName - name of the store operation
+ * @param colFamilyName - name of the column family
+ */
+ private def verifyColFamilyCreationOrDeletion(
+ operationName: String,
+ colFamilyName: String,
+ isInternal: Boolean = false): Unit = {
+ // if the state store instance does not support multiple column
families, throw an exception
+ if (!useColumnFamilies) {
+ throw StateStoreErrors.unsupportedOperationException(operationName,
+ multColFamiliesDisabledStr)
+ }
+
+ // if the column family name is empty or contains leading/trailing
whitespaces
+ // or using the reserved "default" column family, throw an exception
+ if (colFamilyName.isEmpty
+ || colFamilyName.trim != colFamilyName
+ || colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+ throw
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
+ }
+
+ // if the column family is not internal and uses reserved characters,
throw an exception
+ if (!isInternal && colFamilyName.charAt(0) == '_') {
+ throw
StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName)
+ }
+ }
+
+ /**
+ * Check whether the column family name is for internal column families.
+ *
+ * @param cfName - column family name
+ * @return - true if the column family is for internal use, false otherwise
+ */
+ def checkInternalColumnFamilies(cfName: String): Boolean =
cfName.charAt(0) == '_'
+
+ /**
+ * Create RocksDB column family, if not created already
+ */
+ def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean =
false): Unit = {
+ verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName,
isInternal)
+ if (!checkColFamilyExists(colFamilyName)) {
+ colFamilyNameToIdMap.putIfAbsent(colFamilyName,
colFamilyId.incrementAndGet().toShort)
+ }
+ }
+
+ /**
+ * Remove RocksDB column family, if exists
+ */
+ def removeColFamilyIfExists(colFamilyName: String): Boolean = {
+ verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
+ if (checkColFamilyExists(colFamilyName)) {
+ colFamilyNameToIdMap.remove(colFamilyName)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Function to check if the column family exists in the state store
instance.
+ *
+ * @param colFamilyName - name of the column family
+ * @return - true if the column family exists, false otherwise
+ */
+ def checkColFamilyExists(colFamilyName: String): Boolean = {
+ colFamilyNameToIdMap.keys.asScala.toSeq.contains(colFamilyName)
Review Comment:
I guess we could use `containsKey` here ?
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#containsKey-java.lang.Object-
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]