sahnib commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1530503577


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
 object RocksDBStateEncoder {
   def getKeyEncoder(
       keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+      keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+      numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {

Review Comment:
   `numColsPrefixKey` applies to both RangeScan and PrefixKey Encoder now. I 
think the name `numColsPrefixKey` is somewhat confusing because it does not 
convey that it applies to RangeScan. 
   
   In one way, RangeScan relies on creating a prefix key, where encoding scheme 
preserves order and doing range operations on it.
   
   [This is probably a large change but worth considering] Should we create a 
KeyEncoderSpec trait which allows the user to provide the encoder required spec 
as PrefixKeyEncoderSpec, NoPrefixKeyEncoderSpec, RangeScanKeyEncoderSpec. Each 
corresponding object can have fields like `numColsPrefixKey` inside it and then 
we can case match on it. This would allow us to keep the extra required for the 
encoder inside the corresponding Spec object, rather than adding all these 
fields in the API. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size 
fields
+ * Note that for range scan, we have to encode the ordering columns using 
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise 
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: 
LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, 
idx.toString)
+    }
+  }
+
+  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.drop(numOrderingCols)
+  }
+
+  private val rangeScanKeyProjection: UnsafeProjection = {
+    val refs = rangeScanKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val remainingKeyProjection: UnsafeProjection = {
+    val refs = remainingKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val restoreKeyProjection: UnsafeProjection = 
UnsafeProjection.create(keySchema)
+
+  // Reusable objects
+  private val joinedRowOnKey = new JoinedRow()
+
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    rangeScanKeyProjection(key)
+  }
+
+  // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
+  // using byte arrays.
+  private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.get(idx, field.dataType)
+      field.dataType match {
+        // endian-ness doesn't matter for single byte objects. so just write 
these
+        // types directly.
+        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+        // for other multi-byte types, we need to convert to big-endian
+        case ShortType =>
+          val bbuf = ByteBuffer.allocate(2)

Review Comment:
   [nit] we can use `field.dataType.defaultSize` rather than hard-coding it 
here. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -305,9 +307,15 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
providerName)
     }
 
-    require((keySchema.length == 0 && numColsPrefixKey == 0) ||
-      (keySchema.length > numColsPrefixKey), "The number of columns in the key 
must be " +

Review Comment:
   We still want to validate that keySchema has more fields than 
numColsPrefixKey. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
 object RocksDBStateEncoder {
   def getKeyEncoder(
       keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+      keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+      numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {

Review Comment:
   I think we should not set `numColsPrefixKey` default to zero here as this 
object does not perform validation before creating the keyEncoder. Both prefix 
key and range scan encoders need `numColsPrefixKey > 0`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -85,13 +85,13 @@ class TimerStateImpl(
   }
 
   val keyToTsCFName = timerCFName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
-  store.createColFamilyIfAbsent(keyToTsCFName,
+  store.createColFamilyIfAbsent(keyToTsCFName, PrefixKeyScanStateEncoderType,

Review Comment:
   We want to use RangeScan for State v2 timers, right? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size 
fields
+ * Note that for range scan, we have to encode the ordering columns using 
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise 
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: 
LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, 
idx.toString)
+    }
+  }
+

Review Comment:
   [nit] can we keep all validations for input in one place for easier reading 
in future. Maybe also put in the class constructor (does not make a difference 
technically but makes it easier to follow what is happening at construction 
time). 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -17,16 +17,18 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.nio.{ByteBuffer, ByteOrder}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
 import 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
 STATE_ENCODING_VERSION}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, 
StructType}

Review Comment:
   [nit] Maybe just `mport org.apache.spark.sql.types._` is better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to