zifeif2 commented on code in PR #53104:
URL: https://github.com/apache/spark/pull/53104#discussion_r2540300182
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -237,6 +245,91 @@ class StatePartitionReader(
}
}
+/**
+ * An implementation of [[StatePartitionReaderBase]] for reading all column
families
+ * in binary format. This reader returns raw key and value bytes along with
column family names.
+ */
+class StatePartitionReaderAllColumnFamilies(
+ storeConf: StateStoreConf,
+ hadoopConf: SerializableConfiguration,
+ partition: StateStoreInputPartition,
+ schema: StructType)
+ extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
+ NoPrefixKeyStateEncoderSpec(new StructType()), None, None, None, None) {
+
+ val allStateStoreMetadata = {
+ new StateMetadataPartitionReader(
+ partition.sourceOptions.resolvedCpLocation,
+ new SerializableConfiguration(hadoopConf.value),
+ partition.sourceOptions.batchId).stateMetadata.toArray
+ }
+
+ private lazy val store: ReadStateStore = {
+ assert(getStartStoreUniqueId == getEndStoreUniqueId,
+ "Start and end store unique IDs must be the same when reading all column
families")
+ provider.getReadStore(
+ partition.sourceOptions.batchId + 1,
+ getStartStoreUniqueId
+ )
+ }
+
+ val colFamilyNames: Seq[String] = {
+ // todo: Support operator with multiple column family names in next PR
Review Comment:
Spark jira is taking forever to load today... will add the ticket number in
the next version
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]