micheal-o commented on code in PR #53104:
URL: https://github.com/apache/spark/pull/53104#discussion_r2539756121


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -65,28 +65,38 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
     val sourceOptions = StateSourceOptions.modifySourceOptions(hadoopConf,
       StateSourceOptions.apply(session, hadoopConf, properties))
     val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, 
sourceOptions.batchId)
-    val stateStoreReaderInfo: StateStoreReaderInfo = 
getStoreMetadataAndRunChecks(
-      sourceOptions)
-
-    // The key state encoder spec should be available for all operators except 
stream-stream joins
-    val keyStateEncoderSpec = if 
(stateStoreReaderInfo.keyStateEncoderSpecOpt.isDefined) {
-      stateStoreReaderInfo.keyStateEncoderSpecOpt.get
+    if (sourceOptions.readAllColumnFamilies) {
+      // For readAllColumnFamilies mode, we don't need specific metadata
+      val keyStateEncoderSpec = NoPrefixKeyStateEncoderSpec(new StructType())
+      new StateTable(session, schema, sourceOptions, stateConf, 
keyStateEncoderSpec,

Review Comment:
   We need to know the metadata/schema for the default CF right. Since this is 
only supporting default CF for now.
   
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -65,28 +65,38 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
     val sourceOptions = StateSourceOptions.modifySourceOptions(hadoopConf,
       StateSourceOptions.apply(session, hadoopConf, properties))
     val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, 
sourceOptions.batchId)
-    val stateStoreReaderInfo: StateStoreReaderInfo = 
getStoreMetadataAndRunChecks(
-      sourceOptions)
-
-    // The key state encoder spec should be available for all operators except 
stream-stream joins
-    val keyStateEncoderSpec = if 
(stateStoreReaderInfo.keyStateEncoderSpecOpt.isDefined) {
-      stateStoreReaderInfo.keyStateEncoderSpecOpt.get
+    if (sourceOptions.readAllColumnFamilies) {
+      // For readAllColumnFamilies mode, we don't need specific metadata
+      val keyStateEncoderSpec = NoPrefixKeyStateEncoderSpec(new StructType())

Review Comment:
   Lets get the encoder from the schema. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -84,13 +88,17 @@ abstract class StatePartitionReaderBase(
   protected val keySchema = {
     if (SchemaUtil.checkVariableType(stateVariableInfoOpt, 
StateVariableType.MapState)) {
       SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
+    } else if (partition.sourceOptions.readAllColumnFamilies) {
+      new StructType().add("keyBytes", BinaryType, nullable = false)
     } else {
       SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
     }
   }
 
   protected val valueSchema = if (stateVariableInfoOpt.isDefined) {
     schemaForValueRow
+  } else if (partition.sourceOptions.readAllColumnFamilies) {
+    new StructType().add("valueBytes", BinaryType, nullable = false)

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -406,6 +418,7 @@ object StateSourceOptions extends DataSourceOptions {
   val STATE_VAR_NAME = newOption("stateVarName")
   val READ_REGISTERED_TIMERS = newOption("readRegisteredTimers")
   val FLATTEN_COLLECTION_TYPES = newOption("flattenCollectionTypes")
+  val READ_ALL_COLUMN_FAMILIES = newOption("readAllColumnFamilies")

Review Comment:
   make this `internalOnlyReadAllColumnFamilies`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -84,13 +88,17 @@ abstract class StatePartitionReaderBase(
   protected val keySchema = {
     if (SchemaUtil.checkVariableType(stateVariableInfoOpt, 
StateVariableType.MapState)) {
       SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
+    } else if (partition.sourceOptions.readAllColumnFamilies) {
+      new StructType().add("keyBytes", BinaryType, nullable = false)

Review Comment:
   This is wrong, this is meant to be the actual key schema. Because this is 
the schema that would be used to load the store



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -237,6 +245,91 @@ class StatePartitionReader(
   }
 }
 
+/**
+ * An implementation of [[StatePartitionReaderBase]] for reading all column 
families
+ * in binary format. This reader returns raw key and value bytes along with 
column family names.
+ */
+class StatePartitionReaderAllColumnFamilies(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    schema: StructType)
+  extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
+    NoPrefixKeyStateEncoderSpec(new StructType()), None, None, None, None) {

Review Comment:
   ditto. Use the actual key encoder of the default CF. We need that when 
creating the provider



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -75,7 +75,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private val providerName = "HDFSBackedStateStoreProvider"
 
   class HDFSBackedReadStateStore(val version: Long, map: 
HDFSBackedStateStoreMap)
-    extends ReadStateStore {
+    extends ReadStateStore with SupportsRawBytesRead {

Review Comment:
   We should throw exception if trying to use this reader with HDFS checkpoint



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -419,6 +420,21 @@ private[sql] class RocksDBStateStoreProvider
       }
     }
 
+    override def rawIterator(colFamilyName: String): Iterator[(Array[Byte], 
Array[Byte])] = {

Review Comment:
   no need to implement this rawIterator, since we need the keyRow too. The 
normal iterator is fine



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -60,6 +61,12 @@ object SchemaUtil {
         .add("key", keySchema)
         .add("value", valueSchema)
         .add("partition_id", IntegerType)
+    } else if (sourceOptions.readAllColumnFamilies) {
+      new StructType()
+        .add("partition_id", IntegerType)

Review Comment:
   This is `partition_key`. And its type will be the actual schema of the 
partition key. For now use the actual key schema as the partition key schema. 
And when you write the row, write the key row as the partition_key for now.
   
   I'm working on the change for specifying the partition key schema and 
extracting the partition key from the key row.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -237,6 +245,91 @@ class StatePartitionReader(
   }
 }
 
+/**
+ * An implementation of [[StatePartitionReaderBase]] for reading all column 
families
+ * in binary format. This reader returns raw key and value bytes along with 
column family names.
+ */
+class StatePartitionReaderAllColumnFamilies(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    schema: StructType)
+  extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
+    NoPrefixKeyStateEncoderSpec(new StructType()), None, None, None, None) {
+
+  val allStateStoreMetadata = {
+    new StateMetadataPartitionReader(
+      partition.sourceOptions.resolvedCpLocation,
+      new SerializableConfiguration(hadoopConf.value),
+      partition.sourceOptions.batchId).stateMetadata.toArray
+  }
+
+  private lazy val store: ReadStateStore = {
+    assert(getStartStoreUniqueId == getEndStoreUniqueId,
+      "Start and end store unique IDs must be the same when reading all column 
families")
+    provider.getReadStore(
+      partition.sourceOptions.batchId + 1,
+      getStartStoreUniqueId
+    )
+  }
+
+  val colFamilyNames: Seq[String] = {
+    // todo: Support operator with multiple column family names in next PR
+    Seq[String]()
+  }
+
+  override protected lazy val provider: StateStoreProvider = {
+    val stateStoreId = 
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
+      partition.sourceOptions.operatorId, partition.partition, 
partition.sourceOptions.storeName)
+    val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
+
+    // Disable format validation when reading raw bytes.
+    // We use binary schemas (keyBytes/valueBytes) which don't match the 
actual schema
+    // of the stored data. Validation would fail in 
HDFSBackedStateStoreProvider when
+    // loading data from disk, so we disable it for raw bytes mode.
+    val modifiedStoreConf = storeConf.withFormatValidationDisabled()
+
+    val keyStateEncoderSpec = NoPrefixKeyStateEncoderSpec(keySchema)
+    val provider = StateStoreProvider.createAndInit(
+      stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+      useColumnFamilies = colFamilyNames.nonEmpty, modifiedStoreConf, 
hadoopConf.value, false, None)
+
+    provider
+  }
+
+  override lazy val iter: Iterator[InternalRow] = {
+    // Single store with column families (join v3, transformWithState, or 
simple operators)
+    require(store.isInstanceOf[SupportsRawBytesRead],
+      s"State store ${store.getClass.getName} does not support raw bytes 
reading")
+
+    val rawStore = store.asInstanceOf[SupportsRawBytesRead]
+    if (colFamilyNames.isEmpty) {
+      rawStore
+        .rawIterator()

Review Comment:
   no need to implement raw bytes iterator. Lets use the normal iterator. And 
when you get the UnsafeRow, you can call `row.getBytes` to get its bytes



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -76,6 +83,24 @@ object SchemaUtil {
     row
   }
 
+  /**
+   * Creates a unified row from raw key and value bytes.
+   * This is an alias for unifyStateRowPairAsBytes that takes individual byte 
arrays
+   * instead of a tuple for better readability.
+   */
+  def unifyStateRowPairAsRawBytes(
+     partition: Int,
+     keyBytes: Array[Byte],
+     valueBytes: Array[Byte],
+     colFamilyName: String): InternalRow = {
+    val row = new GenericInternalRow(4)
+    row.update(0, partition)

Review Comment:
   I explained this above, write the key row (not key bytes) for now as the 
partition key.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -237,6 +245,91 @@ class StatePartitionReader(
   }
 }
 
+/**
+ * An implementation of [[StatePartitionReaderBase]] for reading all column 
families
+ * in binary format. This reader returns raw key and value bytes along with 
column family names.
+ */
+class StatePartitionReaderAllColumnFamilies(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    schema: StructType)
+  extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
+    NoPrefixKeyStateEncoderSpec(new StructType()), None, None, None, None) {
+
+  val allStateStoreMetadata = {
+    new StateMetadataPartitionReader(
+      partition.sourceOptions.resolvedCpLocation,
+      new SerializableConfiguration(hadoopConf.value),
+      partition.sourceOptions.batchId).stateMetadata.toArray
+  }
+
+  private lazy val store: ReadStateStore = {
+    assert(getStartStoreUniqueId == getEndStoreUniqueId,
+      "Start and end store unique IDs must be the same when reading all column 
families")
+    provider.getReadStore(
+      partition.sourceOptions.batchId + 1,
+      getStartStoreUniqueId
+    )
+  }
+
+  val colFamilyNames: Seq[String] = {
+    // todo: Support operator with multiple column family names in next PR

Review Comment:
   add Spark ticket number for the todo



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -75,7 +75,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private val providerName = "HDFSBackedStateStoreProvider"
 
   class HDFSBackedReadStateStore(val version: Long, map: 
HDFSBackedStateStoreMap)
-    extends ReadStateStore {
+    extends ReadStateStore with SupportsRawBytesRead {

Review Comment:
   We are not supporting HDFS state store for repartitioning



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala:
##########
@@ -163,6 +163,26 @@ class StateStoreConf(
    */
   val sqlConfs: Map[String, String] =
     
sqlConf.getAllConfs.filter(_._1.startsWith("spark.sql.streaming.stateStore."))
+
+  /**
+   * Creates a copy of this StateStoreConf with format validation disabled.
+   * This is useful when reading raw bytes where the schema used (binary) 
doesn't match
+   * the actual stored data schema.
+   */
+  def withFormatValidationDisabled(): StateStoreConf = {

Review Comment:
   Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to