anishshri-db commented on code in PR #47104:
URL: https://github.com/apache/spark/pull/47104#discussion_r1663213134


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -340,11 +370,54 @@ case class TransformWithStateExec(
     )
   }
 
-  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
-    // TODO: transformWithState is special because we don't have the schema of 
the state directly
-    // within the passed args. We need to gather this after running the init 
function
-    // within the stateful processor on the driver. This also requires a 
schema format change
-    // when recording this information persistently.
+  override def validateAndMaybeEvolveStateSchema(
+      hadoopConf: Configuration,
+      batchId: Long,
+      stateSchemaVersion: Int):
+    Array[String] = {
+    assert(stateSchemaVersion >= 3)
+    val newColumnFamilySchemas = getColFamilySchemas()
+    val schemaFile = new StateSchemaV3File(
+      hadoopConf, stateSchemaFilePath().toString)
+    schemaFile.getLatest() match {
+      case Some((_, oldColumnFamilySchemas)) =>
+        validateSchemas(oldColumnFamilySchemas, newColumnFamilySchemas)
+      case None =>
+    }
+    // Write the new schema to the schema file
+    schemaFile.add(batchId, newColumnFamilySchemas.values.toList)
+    // purge oldest files
+    schemaFile.purgeOldest(child.session.sessionState.conf.minBatchesToRetain)
+    Array(schemaFile.getPathFromBatchId(batchId))
+  }
+
+  private def validateSchemas(
+      oldSchemas: List[ColumnFamilySchema],
+      newSchemas: Map[String, ColumnFamilySchema]): Unit = {
+    oldSchemas.foreach { case oldSchema: ColumnFamilySchemaV1 =>
+      newSchemas.get(oldSchema.columnFamilyName).foreach { newSchema =>
+        if (oldSchema != newSchema) {
+          throw StateStoreErrors.stateStoreColumnFamilyMismatch(
+            newSchema.columnFamilyName,
+            oldSchema.json,
+            newSchema.json)
+        }
+      }
+    }
+  }
+
+  private def stateSchemaFilePath(storeName: Option[String] = None): Path = {
+    def stateInfo = getStateInfo
+    val stateCheckpointPath =
+      new Path(getStateInfo.checkpointLocation,
+        s"${stateInfo.operatorId.toString}")
+    storeName match {

Review Comment:
   we don't expect `storeName` to be non-default here right ? we can just 
assert for that ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to