brkyvz commented on code in PR #48944:
URL: https://github.com/apache/spark/pull/48944#discussion_r1887799446


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -60,15 +60,92 @@ sealed trait RocksDBValueStateEncoder {
  * by the callers. The metadata in each row does not need to be written as 
Avro or UnsafeRow,
  * but the actual data provided by the caller does.
  */
+/** Interface for encoding and decoding state store data between UnsafeRow and 
raw bytes.
+ *
+ * @note All encode methods expect non-null input rows. Handling of null 
values is left to the
+ * implementing classes.
+ */
 trait DataEncoder {
+  /** Encodes a complete key row into bytes. Used as the primary key for state 
lookups.
+   *
+   * @param row An UnsafeRow containing all key columns as defined in the 
keySchema
+   * @return Serialized byte array representation of the key
+   */
   def encodeKey(row: UnsafeRow): Array[Byte]
+
+  /** Encodes the non-prefix portion of a key row. Used with prefix scan and

Review Comment:
   Text should start at the second line, not the first in scaladoc



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -356,6 +376,14 @@ case class PrefixKeyScanStateEncoderSpec(
   if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
     throw 
StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString)
   }
+  override def toEncoder(

Review Comment:
   nit: new line please



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -789,44 +940,58 @@ abstract class RocksDBKeyStateEncoderBase(
   }
 }
 
+/** Factory object for creating state encoders used by RocksDB state store.

Review Comment:
   move text to next line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -402,6 +479,80 @@ class AvroStateEncoder(
 
   private lazy val remainingKeyAvroProjection = 
UnsafeProjection.create(remainingKeySchema)
 
+

Review Comment:
   uber nit: delete extra empty lines



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -402,6 +479,80 @@ class AvroStateEncoder(
 
   private lazy val remainingKeyAvroProjection = 
UnsafeProjection.create(remainingKeySchema)
 
+
+
+  private def getAvroSerializer(schema: StructType): AvroSerializer = {
+    val avroType = SchemaConverters.toAvroType(schema)
+    new AvroSerializer(schema, avroType, nullable = false)
+  }
+
+  private def getAvroDeserializer(schema: StructType): AvroDeserializer = {
+    val avroType = SchemaConverters.toAvroType(schema)
+    val avroOptions = AvroOptions(Map.empty)
+    new AvroDeserializer(avroType, schema,
+      avroOptions.datetimeRebaseModeInRead, 
avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType, 
avroOptions.recursiveFieldMaxDepth)
+  }
+
+  /**
+   * Creates an AvroEncoder that handles both key and value 
serialization/deserialization.
+   * This method sets up the complete encoding infrastructure needed for state 
store operations.
+   *
+   * The encoder handles different key encoding specifications:
+   * - NoPrefixKeyStateEncoderSpec: Simple key encoding without prefix
+   * - PrefixKeyScanStateEncoderSpec: Keys with prefix for efficient scanning
+   * - RangeKeyScanStateEncoderSpec: Keys with ordering requirements for range 
scans
+   *
+   * For prefix scan cases, it also creates separate encoders for the suffix 
portion of keys.
+   *
+   * @param keyStateEncoderSpec Specification for how to encode keys
+   * @param valueSchema Schema for the values to be encoded
+   * @return An AvroEncoder containing all necessary serializers and 
deserializers
+   */
+  private def createAvroEnc(
+                             keyStateEncoderSpec: KeyStateEncoderSpec,
+                             valueSchema: StructType
+                           ): AvroEncoder = {

Review Comment:
   please fix the indent



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -325,6 +325,18 @@ sealed trait KeyStateEncoderSpec {
   def keySchema: StructType
   def jsonValue: JValue
   def json: String = compact(render(jsonValue))
+
+  /** Creates a RocksDBKeyStateEncoder for this specification.

Review Comment:
   ditto on scaladoc



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -60,15 +60,92 @@ sealed trait RocksDBValueStateEncoder {
  * by the callers. The metadata in each row does not need to be written as 
Avro or UnsafeRow,
  * but the actual data provided by the caller does.
  */
+/** Interface for encoding and decoding state store data between UnsafeRow and 
raw bytes.
+ *
+ * @note All encode methods expect non-null input rows. Handling of null 
values is left to the
+ * implementing classes.
+ */

Review Comment:
   is this in the right place?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -126,8 +126,7 @@ class SessionGroupsStatefulProcessorWithTTL extends
  * Test suite to verify integration of state data source reader with the 
transformWithState operator
  */
 class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest
-  with AlsoTestWithEncodingTypes
-  with AlsoTestWithChangelogCheckpointingEnabled {

Review Comment:
   did you lose the `AlsoTestWithChangelogCheckpointingEnabled`? Does 
`AlsoTestWithRocksDBFeatures` contain that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to