brkyvz commented on code in PR #48944:
URL: https://github.com/apache/spark/pull/48944#discussion_r1887799446
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -60,15 +60,92 @@ sealed trait RocksDBValueStateEncoder {
* by the callers. The metadata in each row does not need to be written as
Avro or UnsafeRow,
* but the actual data provided by the caller does.
*/
+/** Interface for encoding and decoding state store data between UnsafeRow and
raw bytes.
+ *
+ * @note All encode methods expect non-null input rows. Handling of null
values is left to the
+ * implementing classes.
+ */
trait DataEncoder {
+ /** Encodes a complete key row into bytes. Used as the primary key for state
lookups.
+ *
+ * @param row An UnsafeRow containing all key columns as defined in the
keySchema
+ * @return Serialized byte array representation of the key
+ */
def encodeKey(row: UnsafeRow): Array[Byte]
+
+ /** Encodes the non-prefix portion of a key row. Used with prefix scan and
Review Comment:
Text should start at the second line, not the first in scaladoc
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -356,6 +376,14 @@ case class PrefixKeyScanStateEncoderSpec(
if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
throw
StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString)
}
+ override def toEncoder(
Review Comment:
nit: new line please
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -789,44 +940,58 @@ abstract class RocksDBKeyStateEncoderBase(
}
}
+/** Factory object for creating state encoders used by RocksDB state store.
Review Comment:
move text to next line
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -402,6 +479,80 @@ class AvroStateEncoder(
private lazy val remainingKeyAvroProjection =
UnsafeProjection.create(remainingKeySchema)
+
Review Comment:
uber nit: delete extra empty lines
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -402,6 +479,80 @@ class AvroStateEncoder(
private lazy val remainingKeyAvroProjection =
UnsafeProjection.create(remainingKeySchema)
+
+
+ private def getAvroSerializer(schema: StructType): AvroSerializer = {
+ val avroType = SchemaConverters.toAvroType(schema)
+ new AvroSerializer(schema, avroType, nullable = false)
+ }
+
+ private def getAvroDeserializer(schema: StructType): AvroDeserializer = {
+ val avroType = SchemaConverters.toAvroType(schema)
+ val avroOptions = AvroOptions(Map.empty)
+ new AvroDeserializer(avroType, schema,
+ avroOptions.datetimeRebaseModeInRead,
avroOptions.useStableIdForUnionType,
+ avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth)
+ }
+
+ /**
+ * Creates an AvroEncoder that handles both key and value
serialization/deserialization.
+ * This method sets up the complete encoding infrastructure needed for state
store operations.
+ *
+ * The encoder handles different key encoding specifications:
+ * - NoPrefixKeyStateEncoderSpec: Simple key encoding without prefix
+ * - PrefixKeyScanStateEncoderSpec: Keys with prefix for efficient scanning
+ * - RangeKeyScanStateEncoderSpec: Keys with ordering requirements for range
scans
+ *
+ * For prefix scan cases, it also creates separate encoders for the suffix
portion of keys.
+ *
+ * @param keyStateEncoderSpec Specification for how to encode keys
+ * @param valueSchema Schema for the values to be encoded
+ * @return An AvroEncoder containing all necessary serializers and
deserializers
+ */
+ private def createAvroEnc(
+ keyStateEncoderSpec: KeyStateEncoderSpec,
+ valueSchema: StructType
+ ): AvroEncoder = {
Review Comment:
please fix the indent
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -325,6 +325,18 @@ sealed trait KeyStateEncoderSpec {
def keySchema: StructType
def jsonValue: JValue
def json: String = compact(render(jsonValue))
+
+ /** Creates a RocksDBKeyStateEncoder for this specification.
Review Comment:
ditto on scaladoc
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -60,15 +60,92 @@ sealed trait RocksDBValueStateEncoder {
* by the callers. The metadata in each row does not need to be written as
Avro or UnsafeRow,
* but the actual data provided by the caller does.
*/
+/** Interface for encoding and decoding state store data between UnsafeRow and
raw bytes.
+ *
+ * @note All encode methods expect non-null input rows. Handling of null
values is left to the
+ * implementing classes.
+ */
Review Comment:
is this in the right place?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -126,8 +126,7 @@ class SessionGroupsStatefulProcessorWithTTL extends
* Test suite to verify integration of state data source reader with the
transformWithState operator
*/
class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest
- with AlsoTestWithEncodingTypes
- with AlsoTestWithChangelogCheckpointingEnabled {
Review Comment:
did you lose the `AlsoTestWithChangelogCheckpointingEnabled`? Does
`AlsoTestWithRocksDBFeatures` contain that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]