anishshri-db commented on code in PR #47778:
URL: https://github.com/apache/spark/pull/47778#discussion_r1721174715
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -498,116 +492,63 @@ private[sql] class RocksDBStateStoreProvider
}
/**
- * Class for column family related utility functions.
- * Verification functions for column family names, column family operation
validations etc.
+ * Function to verify invariants for column family based operations
+ * such as get, put, remove etc.
+ *
+ * @param operationName - name of the store operation
+ * @param colFamilyName - name of the column family
*/
- private object ColumnFamilyUtils {
- private val multColFamiliesDisabledStr = "multiple column families is
disabled in " +
- "RocksDBStateStoreProvider"
-
- /**
- * Function to verify invariants for column family based operations
- * such as get, put, remove etc.
- *
- * @param operationName - name of the store operation
- * @param colFamilyName - name of the column family
- */
- def verifyColFamilyOperations(
- operationName: String,
- colFamilyName: String): Unit = {
- if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
- // if the state store instance does not support multiple column
families, throw an exception
- if (!useColumnFamilies) {
- throw StateStoreErrors.unsupportedOperationException(operationName,
- multColFamiliesDisabledStr)
- }
-
- // if the column family name is empty or contains leading/trailing
whitespaces, throw an
- // exception
- if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
- throw
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
- }
-
- // if the column family does not exist, throw an exception
- if (!checkColFamilyExists(colFamilyName)) {
- throw
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
- colFamilyName)
- }
- }
- }
-
- /**
- * Function to verify invariants for column family creation or deletion
operations.
- *
- * @param operationName - name of the store operation
- * @param colFamilyName - name of the column family
- */
- private def verifyColFamilyCreationOrDeletion(
- operationName: String,
- colFamilyName: String,
- isInternal: Boolean = false): Unit = {
+ private def verifyColFamilyOperations(
+ operationName: String,
+ colFamilyName: String): Unit = {
+ if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
// if the state store instance does not support multiple column
families, throw an exception
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException(operationName,
multColFamiliesDisabledStr)
}
- // if the column family name is empty or contains leading/trailing
whitespaces
- // or using the reserved "default" column family, throw an exception
- if (colFamilyName.isEmpty
- || colFamilyName.trim != colFamilyName
- || colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+ // if the column family name is empty or contains leading/trailing
whitespaces, throw an
+ // exception
+ if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
throw
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
}
- // if the column family is not internal and uses reserved characters,
throw an exception
- if (!isInternal && colFamilyName.charAt(0) == '_') {
- throw
StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName)
+ // if the column family does not exist, throw an exception
+ if (!rocksDB.checkColFamilyExists(colFamilyName)) {
+ throw
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+ colFamilyName)
}
}
+ }
- /**
- * Check whether the column family name is for internal column families.
- *
- * @param cfName - column family name
- * @return - true if the column family is for internal use, false otherwise
- */
- def checkInternalColumnFamilies(cfName: String): Boolean =
cfName.charAt(0) == '_'
-
- /**
- * Create RocksDB column family, if not created already
- */
- def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean =
false):
- Option[Short] = {
- verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName,
isInternal)
- if (!checkColFamilyExists(colFamilyName)) {
- val newColumnFamilyId = colFamilyId.incrementAndGet().toShort
- colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId)
- Option(newColumnFamilyId)
- } else None
+ /**
+ * Function to verify invariants for column family creation or deletion
operations.
+ *
+ * @param operationName - name of the store operation
+ * @param colFamilyName - name of the column family
+ */
+ private def verifyColFamilyCreationOrDeletion(
+ operationName: String,
+ colFamilyName: String,
+ isInternal: Boolean = false): Unit = {
+ // if the state store instance does not support multiple column families,
throw an exception
+ if (!useColumnFamilies) {
+ throw StateStoreErrors.unsupportedOperationException(operationName,
+ multColFamiliesDisabledStr)
}
- /**
- * Remove RocksDB column family, if exists
- */
- def removeColFamilyIfExists(colFamilyName: String): Boolean = {
- verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
- if (checkColFamilyExists(colFamilyName)) {
- colFamilyNameToIdMap.remove(colFamilyName)
- true
- } else {
- false
- }
+ // if the column family name is empty or contains leading/trailing
whitespaces
+ // or using the reserved "default" column family, throw an exception
+ if (colFamilyName.isEmpty
+ || colFamilyName.trim != colFamilyName
Review Comment:
lets use `(` and `)` explicitly ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]