anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1530919219
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
override def supportPrefixKeyScan: Boolean = true
}
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size
fields
+ * Note that for range scan, we have to encode the ordering columns using
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+ keySchema: StructType,
+ numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+ import RocksDBStateEncoder._
+
+ // Verify that num cols specified for ordering are valid
+ // Note that ordering cols can be equal to number of key schema columns
+ if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+ throw
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+ }
+
+ private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.take(numOrderingCols)
+ }
+
+ private def isFixedSize(dataType: DataType): Boolean = dataType match {
+ case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _:
LongType |
+ _: FloatType | _: DoubleType => true
+ case _ => false
+ }
+
+ // verify that only fixed sized columns are used for ordering
+ rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+ if (!isFixedSize(field.dataType)) {
+ throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name,
idx.toString)
+ }
+ }
+
+ private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.drop(numOrderingCols)
+ }
+
+ private val rangeScanKeyProjection: UnsafeProjection = {
+ val refs = rangeScanKeyFieldsWithIdx.map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(refs)
+ }
+
+ private val remainingKeyProjection: UnsafeProjection = {
+ val refs = remainingKeyFieldsWithIdx.map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(refs)
+ }
+
+ private val restoreKeyProjection: UnsafeProjection =
UnsafeProjection.create(keySchema)
+
+ // Reusable objects
+ private val joinedRowOnKey = new JoinedRow()
+
+ private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+ rangeScanKeyProjection(key)
+ }
+
+ // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN
encoding
+ // using byte arrays.
+ private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+ val writer = new UnsafeRowWriter(numOrderingCols)
+ writer.resetRowWriter()
+ rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+ val value = row.get(idx, field.dataType)
+ field.dataType match {
+ // endian-ness doesn't matter for single byte objects. so just write
these
+ // types directly.
+ case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+ case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+ // for other multi-byte types, we need to convert to big-endian
+ case ShortType =>
+ val bbuf = ByteBuffer.allocate(2)
Review Comment:
Done
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -17,16 +17,18 @@
package org.apache.spark.sql.execution.streaming.state
+import java.nio.{ByteBuffer, ByteOrder}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow,
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
STATE_ENCODING_VERSION}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType,
DoubleType, FloatType, IntegerType, LongType, ShortType, StructField,
StructType}
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]