micheal-o commented on code in PR #53316:
URL: https://github.com/apache/spark/pull/53316#discussion_r2604969529
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -242,14 +269,18 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
private def getStoreMetadataAndRunChecks(sourceOptions: StateSourceOptions):
StateStoreReaderInfo = {
val storeMetadata = StateDataSource.getStateStoreMetadata(sourceOptions,
hadoopConf)
- runStateVarChecks(sourceOptions, storeMetadata)
+ if (!sourceOptions.internalOnlyReadAllColumnFamilies) {
Review Comment:
please be adding comments why for future readers
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -113,10 +119,17 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
sourceOptions.operatorId, RightSide, oldSchemaFilePaths)
case JoinSideValues.none =>
- // we should have the schema for the state store if joinSide is none
- require(stateStoreReaderInfo.stateStoreColFamilySchemaOpt.isDefined)
- val resultSchema =
stateStoreReaderInfo.stateStoreColFamilySchemaOpt.get
- (resultSchema.keySchema, resultSchema.valueSchema)
+ if (isReadAllColFamiliesOnJoinV3(sourceOptions)) {
Review Comment:
ditto, you wouldn't need this check too
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -81,6 +81,11 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
// The key state encoder spec should be available for all operators except
stream-stream joins
val keyStateEncoderSpec = if
(stateStoreReaderInfo.keyStateEncoderSpecOpt.isDefined) {
stateStoreReaderInfo.keyStateEncoderSpecOpt.get
+ } else if (isReadAllColFamiliesOnJoinV3(sourceOptions)) {
Review Comment:
If you set for joinv3 in `getStoreMetadataAndRunChecks` the way i mentioned
in the comment below. Then you wouldn't need this.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -44,14 +48,16 @@ class StatePartitionReaderFactory(
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
stateSchemaProviderOpt: Option[StateSchemaProvider],
- joinColFamilyOpt: Option[String])
+ joinColFamilyOpt: Option[String],
+ allColumnFamiliesReaderInfo: Option[AllColumnFamiliesReaderInfo])
extends PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
val stateStoreInputPartition =
partition.asInstanceOf[StateStoreInputPartition]
if
(stateStoreInputPartition.sourceOptions.internalOnlyReadAllColumnFamilies) {
new StatePartitionAllColumnFamiliesReader(storeConf, hadoopConf,
- stateStoreInputPartition, schema, keyStateEncoderSpec,
stateStoreColFamilySchemaOpt)
+ stateStoreInputPartition, schema, keyStateEncoderSpec,
+ allColumnFamiliesReaderInfo.getOrElse(AllColumnFamiliesReaderInfo()))
Review Comment:
No, It should always be passed in for all CF reader mode right? Lets add a
`require` for this here
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -268,13 +299,16 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
if (sourceOptions.readRegisteredTimers) {
stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
}
-
- val stateVarInfoList = operatorProperties.stateVariables
- .filter(stateVar => stateVar.stateName == stateVarName)
- require(stateVarInfoList.size == 1, s"Failed to find unique state
variable info " +
- s"for state variable $stateVarName in operator
${sourceOptions.operatorId}")
- val stateVarInfo = stateVarInfoList.head
- transformWithStateVariableInfoOpt = Some(stateVarInfo)
+ if (!sourceOptions.internalOnlyReadAllColumnFamilies) {
Review Comment:
flip this:
```
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
stateVariableInfos = operatorProperties.stateVariables
} else {
...
}
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -131,6 +144,20 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
override def supportsExternalMetadata(): Boolean = false
+ /**
+ * Returns true if this is a read-all-column-families request for a
stream-stream join
+ * that uses virtual column families (state format version 3).
+ */
+ private def isReadAllColFamiliesOnJoinV3(sourceOptions: StateSourceOptions):
Boolean = {
+ val storeMetadata = StateDataSource.getStateStoreMetadata(sourceOptions,
hadoopConf)
Review Comment:
This actually reads the metadata file again from cloud. We already read this
file in `getStoreMetadataAndRunChecks` in `inferSchema`. We can just pass it in
here.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -29,6 +29,10 @@ import org.apache.spark.sql.types.{NullType, StructField,
StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{NextIterator, SerializableConfiguration}
+case class AllColumnFamiliesReaderInfo(
+ colFamilySchemas: List[StateStoreColFamilySchema] = List.empty,
Review Comment:
nit: indentation should be 4
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -81,25 +87,23 @@ abstract class StatePartitionReaderBase(
extends PartitionReader[InternalRow] with Logging {
// Used primarily as a placeholder for the value schema in the context of
// state variables used within the transformWithState operator.
- private val schemaForValueRow: StructType =
+ private val dummySchema: StructType =
StructType(Array(StructField("__dummy__", NullType)))
protected val keySchema : StructType = {
if (SchemaUtil.checkVariableType(stateVariableInfoOpt,
StateVariableType.MapState)) {
SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
- require(stateStoreColFamilySchemaOpt.isDefined)
- stateStoreColFamilySchemaOpt.map(_.keySchema).get
+ stateStoreColFamilySchemaOpt.map(_.keySchema).getOrElse(dummySchema)
} else {
SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
}
}
protected val valueSchema : StructType = if (stateVariableInfoOpt.isDefined)
{
- schemaForValueRow
+ dummySchema
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
- require(stateStoreColFamilySchemaOpt.isDefined)
- stateStoreColFamilySchemaOpt.map(_.valueSchema).get
+ stateStoreColFamilySchemaOpt.map(_.valueSchema).getOrElse(dummySchema)
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -258,32 +262,92 @@ class StatePartitionAllColumnFamiliesReader(
partition: StateStoreInputPartition,
schema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
- stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
+ allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo)
extends StatePartitionReaderBase(
storeConf,
hadoopConf, partition, schema,
- keyStateEncoderSpec, None, stateStoreColFamilySchemaOpt, None, None) {
+ keyStateEncoderSpec, None,
+ allColumnFamiliesReaderInfo.colFamilySchemas.find(
+ _.colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME),
+ None, None) {
Review Comment:
why are you not passing in the schemaProvider passed in from the factory to
the base?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -301,15 +335,26 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
val storeId = new StateStoreId(stateCheckpointLocation.toString,
sourceOptions.operatorId,
partitionId, sourceOptions.storeName)
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
- val manager = new StateSchemaCompatibilityChecker(providerId,
hadoopConf,
- oldSchemaFilePaths = oldSchemaFilePaths)
+ val manager = new StateSchemaCompatibilityChecker(
+ providerId, hadoopConf, oldSchemaFilePaths)
val stateSchema = manager.readSchemaFile()
- // Based on the version and read schema, populate the
keyStateEncoderSpec used for
- // reading the column families
- val resultSchema = stateSchema.filter(_.colFamilyName ==
stateVarName).head
- keyStateEncoderSpecOpt = Some(getKeyStateEncoderSpec(resultSchema,
storeMetadata))
- stateStoreColFamilySchemaOpt = Some(resultSchema)
+ if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+ // Store all column family schemas for multi-CF reading
+ stateStoreColFamilySchemas = stateSchema
+ }
+ // When reading all column families for Join V3, no specific state
variable is targeted,
+ // so stateVarName defaults to DEFAULT_COL_FAMILY_NAME.
+ // However, Join V3 does not have a "default" column family.
Therefore, we skip populating
+ // keyStateEncoderSpec and stateStoreColFamilySchemaOpt in this case,
as there is no
+ // matching schema for the default column family name.
+ if (!isReadAllColFamiliesOnJoinV3(sourceOptions)) {
Review Comment:
For all CF join v3, you can just pick the first cf schema as the default
i.e. set `resultSchema` to the first schema since there is no default. Then you
won't need the checks I mentioned above.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -258,32 +262,92 @@ class StatePartitionAllColumnFamiliesReader(
partition: StateStoreInputPartition,
schema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
- stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
+ allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo)
extends StatePartitionReaderBase(
storeConf,
hadoopConf, partition, schema,
- keyStateEncoderSpec, None, stateStoreColFamilySchemaOpt, None, None) {
+ keyStateEncoderSpec, None,
+ allColumnFamiliesReaderInfo.colFamilySchemas.find(
+ _.colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME),
+ None, None) {
- private lazy val store: ReadStateStore = {
+ private val stateStoreColFamilySchemas =
allColumnFamiliesReaderInfo.colFamilySchemas
+ private val stateVariableInfos =
allColumnFamiliesReaderInfo.stateVariableInfos
+
+ private def isListType(colFamilyName: String): Boolean = {
+ SchemaUtil.checkVariableType(
+ stateVariableInfos.find(info => info.stateName == colFamilyName),
+ StateVariableType.ListState)
+ }
+
+ // Override provider to register ALL column families
+ override protected lazy val provider: StateStoreProvider = {
+ val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
+ partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
+ val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
+ val useColumnFamilies = stateStoreColFamilySchemas.length > 1
+ StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies, storeConf, hadoopConf.value,
+ useMultipleValuesPerKey = false, stateSchemaProvider = None)
Review Comment:
The factory passed in the `stateSchemaProvider`. We should use it
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -258,32 +262,92 @@ class StatePartitionAllColumnFamiliesReader(
partition: StateStoreInputPartition,
schema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
- stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
+ allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo)
extends StatePartitionReaderBase(
storeConf,
hadoopConf, partition, schema,
- keyStateEncoderSpec, None, stateStoreColFamilySchemaOpt, None, None) {
+ keyStateEncoderSpec, None,
+ allColumnFamiliesReaderInfo.colFamilySchemas.find(
Review Comment:
The factory already passed in the `stateStoreColFamilySchemaOpt` which is
the defaultCF, pass it in here. For join v3 this wouldn't actually be the
default, it will be the CF we are mimicking as default. Which is fine here.
Make the param name show that it is for default i.e. defaultCol...
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -81,25 +87,23 @@ abstract class StatePartitionReaderBase(
extends PartitionReader[InternalRow] with Logging {
// Used primarily as a placeholder for the value schema in the context of
// state variables used within the transformWithState operator.
- private val schemaForValueRow: StructType =
+ private val dummySchema: StructType =
StructType(Array(StructField("__dummy__", NullType)))
protected val keySchema : StructType = {
if (SchemaUtil.checkVariableType(stateVariableInfoOpt,
StateVariableType.MapState)) {
SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
- require(stateStoreColFamilySchemaOpt.isDefined)
- stateStoreColFamilySchemaOpt.map(_.keySchema).get
+ stateStoreColFamilySchemaOpt.map(_.keySchema).getOrElse(dummySchema)
Review Comment:
ditto, you won't need this change, since it will now be always set.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -258,32 +262,92 @@ class StatePartitionAllColumnFamiliesReader(
partition: StateStoreInputPartition,
schema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
- stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
+ allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo)
extends StatePartitionReaderBase(
storeConf,
hadoopConf, partition, schema,
- keyStateEncoderSpec, None, stateStoreColFamilySchemaOpt, None, None) {
+ keyStateEncoderSpec, None,
+ allColumnFamiliesReaderInfo.colFamilySchemas.find(
+ _.colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME),
+ None, None) {
- private lazy val store: ReadStateStore = {
+ private val stateStoreColFamilySchemas =
allColumnFamiliesReaderInfo.colFamilySchemas
+ private val stateVariableInfos =
allColumnFamiliesReaderInfo.stateVariableInfos
+
+ private def isListType(colFamilyName: String): Boolean = {
+ SchemaUtil.checkVariableType(
+ stateVariableInfos.find(info => info.stateName == colFamilyName),
+ StateVariableType.ListState)
+ }
+
+ // Override provider to register ALL column families
Review Comment:
It is good that we override it. Please remove the comment since you're not
registering all here
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -258,32 +262,92 @@ class StatePartitionAllColumnFamiliesReader(
partition: StateStoreInputPartition,
schema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
- stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
+ allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo)
extends StatePartitionReaderBase(
storeConf,
hadoopConf, partition, schema,
- keyStateEncoderSpec, None, stateStoreColFamilySchemaOpt, None, None) {
+ keyStateEncoderSpec, None,
+ allColumnFamiliesReaderInfo.colFamilySchemas.find(
+ _.colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME),
+ None, None) {
- private lazy val store: ReadStateStore = {
+ private val stateStoreColFamilySchemas =
allColumnFamiliesReaderInfo.colFamilySchemas
+ private val stateVariableInfos =
allColumnFamiliesReaderInfo.stateVariableInfos
+
+ private def isListType(colFamilyName: String): Boolean = {
+ SchemaUtil.checkVariableType(
+ stateVariableInfos.find(info => info.stateName == colFamilyName),
+ StateVariableType.ListState)
+ }
+
+ // Override provider to register ALL column families
+ override protected lazy val provider: StateStoreProvider = {
+ val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
+ partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
+ val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
+ val useColumnFamilies = stateStoreColFamilySchemas.length > 1
+ StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies, storeConf, hadoopConf.value,
+ useMultipleValuesPerKey = false, stateSchemaProvider = None)
+ }
+
+ // Use a single store instance for both registering column families and
iteration.
+ // We cannot abort and then get a read store because abort() invalidates the
loaded version,
+ // causing getReadStore() to reload from checkpoint and clear the column
family registrations.
+ private lazy val store: StateStore = {
assert(getStartStoreUniqueId == getEndStoreUniqueId,
"Start and end store unique IDs must be the same when reading all column
families")
- provider.getReadStore(
+ val stateStore = provider.getStore(
partition.sourceOptions.batchId + 1,
getStartStoreUniqueId
)
+
+ // Register all column families from the schema
+ if (stateStoreColFamilySchemas.length > 1) {
+ stateStoreColFamilySchemas.foreach { cfSchema =>
+ cfSchema.colFamilyName match {
+ case StateStore.DEFAULT_COL_FAMILY_NAME => // createAndInit has
registered default
+ case _ =>
+ val isInternal = cfSchema.colFamilyName.startsWith("$")
+ val useMultipleValuesPerKey = isListType(cfSchema.colFamilyName)
+ require(cfSchema.keyStateEncoderSpec.isDefined,
+ s"keyStateEncoderSpec must be defined for column family
${cfSchema.colFamilyName}")
+ stateStore.createColFamilyIfAbsent(
+ cfSchema.colFamilyName,
+ cfSchema.keySchema,
+ cfSchema.valueSchema,
+ cfSchema.keyStateEncoderSpec.get,
+ useMultipleValuesPerKey,
+ isInternal)
+ }
+ }
Review Comment:
Lets also verify that the CF schemas we have covers all the CFs that are in
the DB. i.e. you can introduce a `store.getColFamilyNames` or something, that
will fetch it from the db.
This is to make sure that there isn't a CF in the db, that isn't in our
list. To avoid silent data loss.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]