anishshri-db commented on code in PR #47445:
URL: https://github.com/apache/spark/pull/47445#discussion_r1687025381
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -382,12 +397,45 @@ case class TransformWithStateExec(
batchId: Long,
stateSchemaVersion: Int): List[StateSchemaValidationResult] = {
assert(stateSchemaVersion >= 3)
- val newColumnFamilySchemas = getColFamilySchemas()
+ val newSchemas = getColFamilySchemas()
val stateSchemaDir = stateSchemaDirPath()
- val stateSchemaFilePath = new Path(stateSchemaDir,
s"${batchId}_${UUID.randomUUID().toString}")
-
List(StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
- newColumnFamilySchemas.values.toList, session.sessionState,
stateSchemaVersion,
- schemaFilePath = Some(stateSchemaFilePath)))
+ val newStateSchemaFilePath =
+ new Path(stateSchemaDir, s"${batchId}_${UUID.randomUUID().toString}")
+ val operatorStateMetadata = fetchOperatorStateMetadataLog(hadoopConf,
+ getStateInfo.checkpointLocation, getStateInfo.operatorId).getLatest()
+ val oldStateSchemaFilePath: Option[Path] = operatorStateMetadata match {
+ case Some((_, metadata)) =>
+ metadata match {
+ case v2: OperatorStateMetadataV2 =>
+ Some(new Path(v2.stateStoreInfo.head.stateSchemaFilePath))
+ case _ => None
+ }
+ case None => None
+ }
+ List(StateSchemaCompatibilityChecker.
+ validateAndMaybeEvolveStateSchema(getStateInfo, hadoopConf,
+ newSchemas.values.toList, session.sessionState, stateSchemaVersion,
+ storeName = StateStoreId.DEFAULT_STORE_NAME,
+ oldSchemaFilePath = oldStateSchemaFilePath,
+ newSchemaFilePath = Some(newStateSchemaFilePath)))
+ }
+
+ /** Metadata of this stateful operator and its states stores. */
+ override def operatorStateMetadata(
+ stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
Review Comment:
We don't need default value here right ? we expect this to be passed ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]