sahnib commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1530503577
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
object RocksDBStateEncoder {
def getKeyEncoder(
keySchema: StructType,
- numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
- if (numColsPrefixKey > 0) {
- new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
- } else {
- new NoPrefixKeyStateEncoder(keySchema)
+ keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+ numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {
Review Comment:
`numColsPrefixKey` applies to both RangeScan and PrefixKey Encoder now. I
think the name `numColsPrefixKey` is somewhat confusing because it does not
convey that it applies to RangeScan.
In one way, RangeScan relies on creating a prefix key, where encoding scheme
preserves order and doing range operations on it.
[This is probably a large change but worth considering] Should we create a
KeyEncoderSpec trait which allows the user to provide the encoder required spec
as PrefixKeyEncoderSpec, NoPrefixKeyEncoderSpec, RangeScanKeyEncoderSpec. Each
corresponding object can have fields like `numColsPrefixKey` inside it and then
we can case match on it. This would allow us to keep the extra required for the
encoder inside the corresponding Spec object, rather than adding all these
fields in the API.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
override def supportPrefixKeyScan: Boolean = true
}
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size
fields
+ * Note that for range scan, we have to encode the ordering columns using
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+ keySchema: StructType,
+ numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+ import RocksDBStateEncoder._
+
+ // Verify that num cols specified for ordering are valid
+ // Note that ordering cols can be equal to number of key schema columns
+ if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+ throw
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+ }
+
+ private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.take(numOrderingCols)
+ }
+
+ private def isFixedSize(dataType: DataType): Boolean = dataType match {
+ case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _:
LongType |
+ _: FloatType | _: DoubleType => true
+ case _ => false
+ }
+
+ // verify that only fixed sized columns are used for ordering
+ rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+ if (!isFixedSize(field.dataType)) {
+ throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name,
idx.toString)
+ }
+ }
+
+ private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.drop(numOrderingCols)
+ }
+
+ private val rangeScanKeyProjection: UnsafeProjection = {
+ val refs = rangeScanKeyFieldsWithIdx.map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(refs)
+ }
+
+ private val remainingKeyProjection: UnsafeProjection = {
+ val refs = remainingKeyFieldsWithIdx.map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(refs)
+ }
+
+ private val restoreKeyProjection: UnsafeProjection =
UnsafeProjection.create(keySchema)
+
+ // Reusable objects
+ private val joinedRowOnKey = new JoinedRow()
+
+ private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+ rangeScanKeyProjection(key)
+ }
+
+ // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN
encoding
+ // using byte arrays.
+ private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+ val writer = new UnsafeRowWriter(numOrderingCols)
+ writer.resetRowWriter()
+ rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+ val value = row.get(idx, field.dataType)
+ field.dataType match {
+ // endian-ness doesn't matter for single byte objects. so just write
these
+ // types directly.
+ case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+ case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+ // for other multi-byte types, we need to convert to big-endian
+ case ShortType =>
+ val bbuf = ByteBuffer.allocate(2)
Review Comment:
[nit] we can use `field.dataType.defaultSize` rather than hard-coding it
here.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -305,9 +307,15 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
providerName)
}
- require((keySchema.length == 0 && numColsPrefixKey == 0) ||
- (keySchema.length > numColsPrefixKey), "The number of columns in the key
must be " +
Review Comment:
We still want to validate that keySchema has more fields than
numColsPrefixKey.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
object RocksDBStateEncoder {
def getKeyEncoder(
keySchema: StructType,
- numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
- if (numColsPrefixKey > 0) {
- new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
- } else {
- new NoPrefixKeyStateEncoder(keySchema)
+ keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+ numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {
Review Comment:
I think we should not set `numColsPrefixKey` default to zero here as this
object does not perform validation before creating the keyEncoder. Both prefix
key and range scan encoders need `numColsPrefixKey > 0`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -85,13 +85,13 @@ class TimerStateImpl(
}
val keyToTsCFName = timerCFName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
- store.createColFamilyIfAbsent(keyToTsCFName,
+ store.createColFamilyIfAbsent(keyToTsCFName, PrefixKeyScanStateEncoderType,
Review Comment:
We want to use RangeScan for State v2 timers, right?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
override def supportPrefixKeyScan: Boolean = true
}
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size
fields
+ * Note that for range scan, we have to encode the ordering columns using
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+ keySchema: StructType,
+ numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+ import RocksDBStateEncoder._
+
+ // Verify that num cols specified for ordering are valid
+ // Note that ordering cols can be equal to number of key schema columns
+ if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+ throw
StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+ }
+
+ private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+ keySchema.zipWithIndex.take(numOrderingCols)
+ }
+
+ private def isFixedSize(dataType: DataType): Boolean = dataType match {
+ case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _:
LongType |
+ _: FloatType | _: DoubleType => true
+ case _ => false
+ }
+
+ // verify that only fixed sized columns are used for ordering
+ rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+ if (!isFixedSize(field.dataType)) {
+ throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name,
idx.toString)
+ }
+ }
+
Review Comment:
[nit] can we keep all validations for input in one place for easier reading
in future. Maybe also put in the class constructor (does not make a difference
technically but makes it easier to follow what is happening at construction
time).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -17,16 +17,18 @@
package org.apache.spark.sql.execution.streaming.state
+import java.nio.{ByteBuffer, ByteOrder}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow,
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
STATE_ENCODING_VERSION}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType,
DoubleType, FloatType, IntegerType, LongType, ShortType, StructField,
StructType}
Review Comment:
[nit] Maybe just `mport org.apache.spark.sql.types._` is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]