Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HeartSaVioR closed pull request #45360: [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families URL: https://github.com/apache/spark/pull/45360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HeartSaVioR commented on PR #45360: URL: https://github.com/apache/spark/pull/45360#issuecomment-1990520156 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
anishshri-db commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1520296947 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -582,7 +636,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared assert(iterator(db, colFamily2).isEmpty) } assert(ex.isInstanceOf[RuntimeException]) - assert(ex.getMessage.contains("does not exist")) + assert(ex.getMessage.contains("missing column family")) Review Comment: Done ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ## @@ -134,6 +134,46 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + private def verifyStoreOperationUnsupported()(testFn: => Unit): Unit = { +val ex = intercept[UnsupportedOperationException] { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
anishshri-db commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1520296736 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithColumnFamilies(s"RocksDB: column family creation with invalid names", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +val remoteDir = Utils.createTempDir().toString +new File(remoteDir).delete() // to make sure that the directory gets created + +val conf = RocksDBConf().copy() +withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + Seq("default", "", " ", "", " default", " default ").foreach { colFamilyName => +val ex = intercept[Exception] { Review Comment: Done ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithColumnFamilies(s"RocksDB: column family creation with invalid names", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +val remoteDir = Utils.createTempDir().toString +new File(remoteDir).delete() // to make sure that the directory gets created + +val conf = RocksDBConf().copy() +withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + Seq("default", "", " ", "", " default", " default ").foreach { colFamilyName => +val ex = intercept[Exception] { + db.createColFamilyIfAbsent(colFamilyName) +} +ex.getCause.isInstanceOf[UnsupportedOperationException] + } +} + } + + private def verifyStoreOperationUnsupported( + operationName: String) + (testFn: => Unit): Unit = { +val ex = intercept[UnsupportedOperationException] { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HeartSaVioR commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1518475424 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -582,7 +636,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared assert(iterator(db, colFamily2).isEmpty) } assert(ex.isInstanceOf[RuntimeException]) - assert(ex.getMessage.contains("does not exist")) + assert(ex.getMessage.contains("missing column family")) Review Comment: ditto ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithColumnFamilies(s"RocksDB: column family creation with invalid names", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +val remoteDir = Utils.createTempDir().toString +new File(remoteDir).delete() // to make sure that the directory gets created + +val conf = RocksDBConf().copy() +withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + Seq("default", "", " ", "", " default", " default ").foreach { colFamilyName => +val ex = intercept[Exception] { Review Comment: Verifying exception belongs to error class framework needs be done with checkError. ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithColumnFamilies(s"RocksDB: column family creation with invalid names", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +val remoteDir = Utils.createTempDir().toString +new File(remoteDir).delete() // to make sure that the directory gets created + +val conf = RocksDBConf().copy() +withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + Seq("default", "", " ", "", " default", " default ").foreach { colFamilyName => +val ex = intercept[Exception] { + db.createColFamilyIfAbsent(colFamilyName) +} +ex.getCause.isInstanceOf[UnsupportedOperationException] + } +} + } + + private def verifyStoreOperationUnsupported( + operationName: String) + (testFn: => Unit): Unit = { +val ex = intercept[UnsupportedOperationException] { Review Comment: ditto ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ## @@ -134,6 +134,46 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + private def verifyStoreOperationUnsupported()(testFn: => Unit): Unit = { +val ex = intercept[UnsupportedOperationException] { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HeartSaVioR commented on PR #45360: URL: https://github.com/apache/spark/pull/45360#issuecomment-1985157244 Will review sooner than later. Maybe by today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HyukjinKwon commented on PR #45360: URL: https://github.com/apache/spark/pull/45360#issuecomment-1984994353 Is this good to go? @HeartSaVioR @rangadi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
anishshri-db commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1514964881 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -246,25 +246,35 @@ class RocksDB( colFamilyNameToHandleMap.contains(colFamilyName) } - private def verifyColFamilyExists(colFamilyName: String): Unit = { -if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) { - throw new RuntimeException(s"Column family with name=$colFamilyName does not exist") + private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = { +if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + if (!useColumnFamilies) { +throw StateStoreErrors.unsupportedOperationException(operationName, + "RocksDBStateStoreProvider and multiple column families disabled") + } + + if (!checkColFamilyExists(colFamilyName)) { +throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName, + colFamilyName) + } } } /** * Create RocksDB column family, if not created already */ def createColFamilyIfAbsent(colFamilyName: String): Unit = { -if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { - throw new SparkUnsupportedOperationException( -errorClass = "_LEGACY_ERROR_TEMP_3197", -messageParameters = Map("colFamilyName" -> colFamilyName).toMap) +// Remove leading and trailing whitespaces +val cfName = colFamilyName.trim Review Comment: Modified to check and throw an exception if some invariants are not met for both the col family operations case and the creation/deletion case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
anishshri-db commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1513631712 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -246,25 +246,35 @@ class RocksDB( colFamilyNameToHandleMap.contains(colFamilyName) } - private def verifyColFamilyExists(colFamilyName: String): Unit = { -if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) { - throw new RuntimeException(s"Column family with name=$colFamilyName does not exist") + private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = { +if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + if (!useColumnFamilies) { +throw StateStoreErrors.unsupportedOperationException(operationName, + "RocksDBStateStoreProvider and multiple column families disabled") + } + + if (!checkColFamilyExists(colFamilyName)) { +throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName, + colFamilyName) + } } } /** * Create RocksDB column family, if not created already */ def createColFamilyIfAbsent(colFamilyName: String): Unit = { -if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { - throw new SparkUnsupportedOperationException( -errorClass = "_LEGACY_ERROR_TEMP_3197", -messageParameters = Map("colFamilyName" -> colFamilyName).toMap) +// Remove leading and trailing whitespaces +val cfName = colFamilyName.trim Review Comment: Hmm - user facing APIs shouldn't actually call state store operations directly by passing the name. The assumption is we should never have col families created with leading/trailing spaces. Do you think we should add an assert to verify the same and throw an exception ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
anishshri-db commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1513624528 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -246,25 +246,35 @@ class RocksDB( colFamilyNameToHandleMap.contains(colFamilyName) } - private def verifyColFamilyExists(colFamilyName: String): Unit = { -if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) { - throw new RuntimeException(s"Column family with name=$colFamilyName does not exist") + private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = { +if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + if (!useColumnFamilies) { +throw StateStoreErrors.unsupportedOperationException(operationName, + "RocksDBStateStoreProvider and multiple column families disabled") Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
sahnib commented on code in PR #45360: URL: https://github.com/apache/spark/pull/45360#discussion_r1513594152 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -246,25 +246,35 @@ class RocksDB( colFamilyNameToHandleMap.contains(colFamilyName) } - private def verifyColFamilyExists(colFamilyName: String): Unit = { -if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) { - throw new RuntimeException(s"Column family with name=$colFamilyName does not exist") + private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = { +if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + if (!useColumnFamilies) { +throw StateStoreErrors.unsupportedOperationException(operationName, + "RocksDBStateStoreProvider and multiple column families disabled") + } + + if (!checkColFamilyExists(colFamilyName)) { +throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName, + colFamilyName) + } } } /** * Create RocksDB column family, if not created already */ def createColFamilyIfAbsent(colFamilyName: String): Unit = { -if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { - throw new SparkUnsupportedOperationException( -errorClass = "_LEGACY_ERROR_TEMP_3197", -messageParameters = Map("colFamilyName" -> colFamilyName).toMap) +// Remove leading and trailing whitespaces +val cfName = colFamilyName.trim Review Comment: This trimming also needs to happen when the cx issues a put/get etc against the column family. I dont think thats happening today. In that case, would this lead to inconsistencies? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -246,25 +246,35 @@ class RocksDB( colFamilyNameToHandleMap.contains(colFamilyName) } - private def verifyColFamilyExists(colFamilyName: String): Unit = { -if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) { - throw new RuntimeException(s"Column family with name=$colFamilyName does not exist") + private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = { +if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { + if (!useColumnFamilies) { +throw StateStoreErrors.unsupportedOperationException(operationName, + "RocksDBStateStoreProvider and multiple column families disabled") Review Comment: can be reworded as - `multiple column families disabled in RocksDBStateStoreProvider`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
anishshri-db commented on PR #45360: URL: https://github.com/apache/spark/pull/45360#issuecomment-1974249820 @sahnib @HeartSaVioR - PTAL, thx ! @HeartSaVioR - let me know if you are ok with the proposed dir layout changes and also if you prefer them in a separate PR. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org