anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531410457
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
override def supportPrefixKeyScan: Boolean = true
}
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size
fields
+ * Note that for range scan, we have to encode the ordering columns using
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+ keySchema: StructType,
+ numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+ import RocksDBStateEncoder._
+
+ // Verify that num cols specified for ordering are valid
+ // Note that ordering cols can be equal to number of key schema columns
+ if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+ throw
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+ }
+
+ private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.take(numOrderingCols)
+ }
+
+ private def isFixedSize(dataType: DataType): Boolean = dataType match {
+ case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _:
LongType |
+ _: FloatType | _: DoubleType => true
+ case _ => false
+ }
+
+ // verify that only fixed sized columns are used for ordering
+ rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+ if (!isFixedSize(field.dataType)) {
+ throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name,
idx.toString)
+ }
+ }
+
Review Comment:
Done
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
object RocksDBStateEncoder {
def getKeyEncoder(
keySchema: StructType,
- numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
- if (numColsPrefixKey > 0) {
- new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
- } else {
- new NoPrefixKeyStateEncoder(keySchema)
+ keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+ numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]