neilramaswamy commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1529271953


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -110,8 +119,11 @@ class PrefixKeyScanStateEncoder(
 
   import RocksDBStateEncoder._
 
-  require(keySchema.length > numColsPrefixKey, "The number of columns in the 
key must be " +
-    "greater than the number of columns for prefix key!")
+  // Note that numColsPrefixKey have to be less than the number of columns in 
the key schema
+  // Range scan encoder allows for equality, but prefix key scan encoder does 
not
+  if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsNotSupported(numColsPrefixKey.toString)

Review Comment:
   This error message will say that you can't have _more_ than `keySchema` 
number of cols for the prefix key, but in this particular case, it's possible 
that the user has less (i.e. `numColsPrefixKey == keySchema.length`).



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +160,292 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, 0, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, keySchemaWithRangeScan.length + 1,
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName, RangeKeyScanStateEncoderType,
+          keySchemaWithRangeScan, numColsPrefixKey = 1, valueSchema)
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)

Review Comment:
   For all of these unit tests, what is the methodology for picking these 
numbers? I see `931` and `800` and `452300` a lot... is there a significance?
   
   I think a test with more obviously "tricky" numbers can be useful. For 
example, with 32 and 64, the little endian encoding of 64 is less than the 
little endian encoding of 32 (i.e. 32 little endian is `0000 0010` and 64 
little endian is `0000 0100`). It's possible that a case like this (`a < b` but 
`LE(a) > LE(b)`) exists "accidentally" in these numbers we have, but a simpler 
more explicit test could be good.
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size 
fields
+ * Note that for range scan, we have to encode the ordering columns using 
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise 
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: 
LongType |

Review Comment:
   The only other fixed size type I think is `NullType`, but since we control 
these schemas we shouldn't have it and it will just throw an error. Was that 
your thinking as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to