neilramaswamy commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1529271953
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -110,8 +119,11 @@ class PrefixKeyScanStateEncoder(
import RocksDBStateEncoder._
- require(keySchema.length > numColsPrefixKey, "The number of columns in the
key must be " +
- "greater than the number of columns for prefix key!")
+ // Note that numColsPrefixKey have to be less than the number of columns in
the key schema
+ // Range scan encoder allows for equality, but prefix key scan encoder does
not
+ if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
+ throw
StateStoreErrors.incorrectNumOrderingColsNotSupported(numColsPrefixKey.toString)
Review Comment:
This error message will say that you can't have _more_ than `keySchema`
number of cols for the prefix key, but in this particular case, it's possible
that the user has less (i.e. `numColsPrefixKey == keySchema.length`).
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +160,292 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ // zero ordering cols
+ val ex1 = intercept[SparkUnsupportedOperationException] {
+ tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+ RangeKeyScanStateEncoderType, 0, colFamiliesEnabled)) { provider =>
+ provider.getStore(0)
+ }
+ }
+ checkError(
+ ex1,
+ errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+ parameters = Map(
+ "numOrderingCols" -> "0"
+ ),
+ matchPVals = true
+ )
+
+ // ordering cols greater than schema cols
+ val ex2 = intercept[SparkUnsupportedOperationException] {
+ tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+ RangeKeyScanStateEncoderType, keySchemaWithRangeScan.length + 1,
+ colFamiliesEnabled)) { provider =>
+ provider.getStore(0)
+ }
+ }
+ checkError(
+ ex2,
+ errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+ parameters = Map(
+ "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+ ),
+ matchPVals = true
+ )
+ }
+
+ testWithColumnFamilies("rocksdb range scan validation - variable sized
columns",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ val keySchemaWithVariableSizeCols: StructType = StructType(
+ Seq(StructField("key1", StringType, false), StructField("key2",
StringType, false)))
+
+ val ex = intercept[SparkUnsupportedOperationException] {
+ tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+ RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+ provider.getStore(0)
+ }
+ }
+ checkError(
+ ex,
+ errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+ parameters = Map(
+ "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+ "index" -> "0"
+ ),
+ matchPVals = true
+ )
+ }
+
+ testWithColumnFamilies("rocksdb range scan - fixed size non-ordering
columns",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+
+ tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+ RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+ val store = provider.getStore(0)
+
+ // use non-default col family if column families are enabled
+ val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+ if (colFamiliesEnabled) {
+ store.createColFamilyIfAbsent(cfName, RangeKeyScanStateEncoderType,
+ keySchemaWithRangeScan, numColsPrefixKey = 1, valueSchema)
+ }
+
+ val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L,
3L, 35L, 6L, 9L, 5L)
Review Comment:
For all of these unit tests, what is the methodology for picking these
numbers? I see `931` and `800` and `452300` a lot... is there a significance?
I think a test with more obviously "tricky" numbers can be useful. For
example, with 32 and 64, the little endian encoding of 64 is less than the
little endian encoding of 32 (i.e. 32 little endian is `0000 0010` and 64
little endian is `0000 0100`). It's possible that a case like this (`a < b` but
`LE(a) > LE(b)`) exists "accidentally" in these numbers we have, but a simpler
more explicit test could be good.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
override def supportPrefixKeyScan: Boolean = true
}
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size
fields
+ * Note that for range scan, we have to encode the ordering columns using
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+ keySchema: StructType,
+ numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+ import RocksDBStateEncoder._
+
+ // Verify that num cols specified for ordering are valid
+ // Note that ordering cols can be equal to number of key schema columns
+ if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+ throw
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+ }
+
+ private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.take(numOrderingCols)
+ }
+
+ private def isFixedSize(dataType: DataType): Boolean = dataType match {
+ case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _:
LongType |
Review Comment:
The only other fixed size type I think is `NullType`, but since we control
these schemas we shouldn't have it and it will just throw an error. Was that
your thinking as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]