HeartSaVioR commented on code in PR #47104:
URL: https://github.com/apache/spark/pull/47104#discussion_r1668189431


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+

Review Comment:
   nit: ditto



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2144,6 +2144,13 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(TimeUnit.MINUTES.toMillis(1)) // 1 minute
 
+  val STREAMING_TRANSFORM_WITH_STATE_OP_STATE_SCHEMA_VERSION =
+    buildConf("spark.sql.streaming.transformWithState.stateSchemaVersion")
+      .doc("The version of the state schema used by the transformWithState 
operator")
+      .version("4.0.0")
+      .intConf
+      .createWithDefault(3)

Review Comment:
   Every operator has its own state schema version - version 2 of streaming 
aggregation and version 2 of stream-stream join are totally different one. That 
said, any reason to start this with 3?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+

Review Comment:
   nit: let's not put unnecessary empty lines (multiple empty lines). one line 
is enough.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -131,17 +101,49 @@ class StateSchemaCompatibilityChecker(
       createSchemaFile(newKeySchema, newValueSchema)
     } else {
       // validate if the new schema is compatible with the existing schema
-      check(existingSchema.get, (newKeySchema, newValueSchema), 
ignoreValueSchema)
+      StateSchemaCompatibilityChecker.
+        check(existingSchema.get, (newKeySchema, newValueSchema), 
ignoreValueSchema)
     }
   }
 
   private def schemaFile(storeCpLocation: Path): Path =
     new Path(new Path(storeCpLocation, "_metadata"), "schema")
 }
 
-object StateSchemaCompatibilityChecker {
+object StateSchemaCompatibilityChecker extends Logging {
   val VERSION = 2
 
+

Review Comment:
   nit: unnecessary empty line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -246,7 +246,11 @@ case class StreamingSymmetricHashJoinExec(
     watermarkUsedForStateCleanup && watermarkHasChanged
   }
 
-  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+  override def validateAndMaybeEvolveStateSchema(
+      hadoopConf: Configuration,
+      batchId: Long,
+      stateSchemaVersion: Int
+  ): Array[String] = {

Review Comment:
   nit: move this to one line up



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](

Review Comment:
   nit: could be one liner?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getMapStateSchema[K, V](
+      stateName: String,
+      userKeyEnc: Encoder[K],
+      hasTtl: Boolean): ColumnFamilySchema
+}
+
+object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils {
+
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(
+      stateName,
+      KEY_ROW_SCHEMA,
+      if (hasTtl) {
+        VALUE_ROW_SCHEMA_WITH_TTL
+      } else {
+        VALUE_ROW_SCHEMA
+      },
+      NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA))
+  }
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(
+      stateName,
+      KEY_ROW_SCHEMA,
+      if (hasTtl) {
+        VALUE_ROW_SCHEMA_WITH_TTL
+      } else {
+        VALUE_ROW_SCHEMA
+      },
+      NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA))
+  }
+
+  def getMapStateSchema[K, V](
+      stateName: String,
+      userKeyEnc: Encoder[K],
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(

Review Comment:
   nit: doesn't need `new` as it's a case class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getMapStateSchema[K, V](
+      stateName: String,
+      userKeyEnc: Encoder[K],
+      hasTtl: Boolean): ColumnFamilySchema
+}
+
+object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils {
+
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(

Review Comment:
   nit: doesn't need `new` as it's a case class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(
+    hadoopConf: Configuration,
+    path: String) {
+
+  val VERSION = 3
+
+  val metadataPath = new Path(path)
+
+  protected val fileManager: CheckpointFileManager =
+    CheckpointFileManager.create(metadataPath, hadoopConf)
+
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
+  }
+
+  def deserialize(in: InputStream): List[ColumnFamilySchema] = {
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+    }
+
+    val version = lines.next().trim
+    validateVersion(version, VERSION)
+
+    lines.map(ColumnFamilySchemaV1.fromJson).toList
+  }
+
+  def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = {
+    out.write(s"v${VERSION}".getBytes(UTF_8))
+    out.write('\n')
+    out.write(schemas.map(_.json).mkString("\n").getBytes(UTF_8))
+  }
+
+  def addWithUUID(batchId: Long, metadata: List[ColumnFamilySchema]): Path = {
+    val batchMetadataPath = batchIdToPath(batchId)
+    val schemaFilePath = new Path(batchMetadataPath, 
UUID.randomUUID().toString)
+    fileManager.mkdirs(batchMetadataPath)
+    write(schemaFilePath, out => serialize(metadata, out))
+    schemaFilePath
+  }
+
+  def getWithPath(schemaFilePath: Path): List[ColumnFamilySchema] = {
+    deserialize(fileManager.open(schemaFilePath))
+  }
+
+  protected def write(
+      batchMetadataFile: Path,
+      fn: OutputStream => Unit): Unit = {
+    // Only write metadata when the batch has not yet been written

Review Comment:
   We don't expect this to happen, given the addition of UUID. Even for the 
multiple retrials of the same batch, this will always create new file per trial.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(
+    hadoopConf: Configuration,
+    path: String) {
+
+  val VERSION = 3
+
+  val metadataPath = new Path(path)
+
+  protected val fileManager: CheckpointFileManager =
+    CheckpointFileManager.create(metadataPath, hadoopConf)
+
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
+  }
+
+  def deserialize(in: InputStream): List[ColumnFamilySchema] = {
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+    }
+
+    val version = lines.next().trim
+    validateVersion(version, VERSION)
+
+    lines.map(ColumnFamilySchemaV1.fromJson).toList
+  }
+
+  def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = {

Review Comment:
   same here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(
+    hadoopConf: Configuration,
+    path: String) {
+
+  val VERSION = 3
+
+  val metadataPath = new Path(path)
+
+  protected val fileManager: CheckpointFileManager =
+    CheckpointFileManager.create(metadataPath, hadoopConf)
+
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
+  }
+
+  def deserialize(in: InputStream): List[ColumnFamilySchema] = {

Review Comment:
   descope (private or package private for testing) if this isn't an entry 
point for outside caller



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**
+   * Function to add the ValueState schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ValueStateWithTTL schema to the list of column family 
schemas.

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**

Review Comment:
   nit: Is it the same with the method doc in base interface/class? If then you 
can just skip.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**
+   * Function to add the ValueState schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ValueStateWithTTL schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @param ttlConfig  - the ttl configuration (time to live duration etc.)
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, true)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ListState schema to the list of column family schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ListState of type T that can be used to store state 
persistently
+   */
+  override def getListState[T](stateName: String, valEncoder: Encoder[T]): 
ListState[T] = {
+    verifyStateVarOperations("get_list_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getListStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ListState[T]]
+  }
+
+  /**
+   * Function to add the ListStateWithTTL schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @param ttlConfig  - the ttl configuration (time to live duration etc.)
+   * @tparam T - type of state variable
+   * @return - instance of ListState of type T that can be used to store state 
persistently
+   */
+  override def getListState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ListState[T] = {
+    verifyStateVarOperations("get_list_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getListStateSchema(stateName, true)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ListState[T]]
+  }
+
+  /**
+   * Function to add the MapState schema to the list of column family schemas.

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -92,6 +93,35 @@ case class TransformWithStateExec(
     }
   }
 
+  /**
+   * We initialize this processor handle in the driver to run the init function
+   * and fetch the schemas of the state variables initialized in this 
processor.
+   * @return a new instance of the driver processor handle
+   */
+  private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = {

Review Comment:
   nit: let's use `()` explicitly if it's not just a property of class but 
doing some more thing



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -92,6 +93,35 @@ case class TransformWithStateExec(
     }
   }
 
+  /**
+   * We initialize this processor handle in the driver to run the init function
+   * and fetch the schemas of the state variables initialized in this 
processor.
+   * @return a new instance of the driver processor handle
+   */
+  private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = {
+    val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(timeMode)
+    driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT)
+    statefulProcessor.setHandle(driverProcessorHandle)
+    statefulProcessor.init(outputMode, timeMode)
+    driverProcessorHandle
+  }
+
+  /**
+   * Fetching the columnFamilySchemas from the StatefulProcessorHandle
+   * after init is called.
+   */
+  private def getColFamilySchemas(): Map[String, ColumnFamilySchema] = {
+    val driverProcessorHandle = getDriverProcessorHandle
+    val columnFamilySchemas = driverProcessorHandle.getColumnFamilySchemas
+    closeProcessorHandle()

Review Comment:
   nit: Let's inline if it's only used here. It's just two lines.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -340,11 +370,47 @@ case class TransformWithStateExec(
     )
   }
 
-  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
-    // TODO: transformWithState is special because we don't have the schema of 
the state directly
-    // within the passed args. We need to gather this after running the init 
function
-    // within the stateful processor on the driver. This also requires a 
schema format change
-    // when recording this information persistently.
+  override def validateAndMaybeEvolveStateSchema(
+      hadoopConf: Configuration,
+      batchId: Long,
+      stateSchemaVersion: Int):
+    Array[String] = {
+    assert(stateSchemaVersion >= 3)
+    val newColumnFamilySchemas = getColFamilySchemas()
+    val schemaFile = new StateSchemaV3File(
+      hadoopConf, 
stateSchemaFilePath(StateStoreId.DEFAULT_STORE_NAME).toString)
+    // TODO: Read the schema path from the OperatorStateMetadata file

Review Comment:
   Same, better to have explicit JIRA ticket for every TODO.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -187,23 +189,29 @@ class IncrementalExecution(
     }
   }
 
-  object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
-    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
-      case stateStoreWriter: StateStoreWriter if isFirstBatch =>
-        val metadata = stateStoreWriter.operatorStateMetadata()
-        val metadataWriter = new OperatorStateMetadataWriter(new Path(
-          checkpointLocation, 
stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
-        metadataWriter.write(metadata)
-        stateStoreWriter
-    }
-  }
-
   // Planning rule used to record the state schema for the first run and 
validate state schema
   // changes across query runs.
-  object StateSchemaValidationRule extends SparkPlanPartialRule {
+  object StateSchemaAndOperatorMetadataRule extends SparkPlanPartialRule {
     override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      // In the case of TransformWithStateExec, we want to collect this 
StateSchema
+      // filepath, and write this path out in the OperatorStateMetadata file
       case statefulOp: StatefulOperator if isFirstBatch =>
-        statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf)
+        val stateSchemaVersion = statefulOp match {
+          case _: TransformWithStateExec => sparkSession.sessionState.conf.
+            
getConf(SQLConf.STREAMING_TRANSFORM_WITH_STATE_OP_STATE_SCHEMA_VERSION)
+          case _ => STATE_SCHEMA_DEFAULT_VERSION
+        }
+        val stateSchemaPaths = statefulOp.
+          validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, 
stateSchemaVersion)
+        // write out the state schema paths to the metadata file
+        statefulOp match {
+          case stateStoreWriter: StateStoreWriter =>
+            val metadata = stateStoreWriter.operatorStateMetadata()
+            // TODO: Populate metadata with stateSchemaPaths if metadata 
version is v2

Review Comment:
   Also what is the plan for non-transformWithState? Will they be stick to 
metadata version v1, or will we track the schema file for them as well?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -340,11 +370,47 @@ case class TransformWithStateExec(
     )
   }
 
-  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
-    // TODO: transformWithState is special because we don't have the schema of 
the state directly
-    // within the passed args. We need to gather this after running the init 
function
-    // within the stateful processor on the driver. This also requires a 
schema format change
-    // when recording this information persistently.
+  override def validateAndMaybeEvolveStateSchema(
+      hadoopConf: Configuration,
+      batchId: Long,
+      stateSchemaVersion: Int):
+    Array[String] = {

Review Comment:
   nit: move one line up



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getMapStateSchema[K, V](
+      stateName: String,
+      userKeyEnc: Encoder[K],
+      hasTtl: Boolean): ColumnFamilySchema
+}
+
+object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils {
+
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(
+      stateName,
+      KEY_ROW_SCHEMA,
+      if (hasTtl) {
+        VALUE_ROW_SCHEMA_WITH_TTL
+      } else {
+        VALUE_ROW_SCHEMA
+      },
+      NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA))
+  }
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(

Review Comment:
   nit: doesn't need `new` as it's a case class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala:
##########
@@ -28,6 +33,57 @@ import org.apache.spark.util.Utils
 /**
  * Helper classes for reading/writing state schema.
  */
+sealed trait ColumnFamilySchema extends Serializable {
+  def jsonValue: JValue
+
+  def json: String
+
+  def columnFamilyName: String
+}
+
+case class ColumnFamilySchemaV1(
+    columnFamilyName: String,
+    keySchema: StructType,
+    valueSchema: StructType,
+    keyStateEncoderSpec: KeyStateEncoderSpec,
+    userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema {
+  def jsonValue: JValue = {
+    ("columnFamilyName" -> JString(columnFamilyName)) ~
+      ("keySchema" -> JString(keySchema.json)) ~
+      ("valueSchema" -> JString(valueSchema.json)) ~
+      ("keyStateEncoderSpec" -> keyStateEncoderSpec.jsonValue) ~
+      ("userKeyEncoder" -> userKeyEncoder.map(s => 
JString(s.json)).getOrElse(JNothing))
+  }
+
+  def json: String = {
+    compact(render(jsonValue))
+  }
+}
+
+object ColumnFamilySchemaV1 {
+
+  /**
+   * Create a ColumnFamilySchemaV1 object from the Json string
+   * This function is to read the StateSchemaV3 file
+   */
+  def fromJson(json: String): ColumnFamilySchema = {
+    implicit val formats: DefaultFormats.type = DefaultFormats
+    val colFamilyMap = JsonMethods.parse(json).extract[Map[String, Any]]
+    assert(colFamilyMap.isInstanceOf[Map[_, _]],
+      s"Expected Map but got ${colFamilyMap.getClass}")
+    val keySchema = 
StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String])
+    val valueSchema = 
StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String])
+    new ColumnFamilySchemaV1(

Review Comment:
   nit: doesn't need `new` as it's a case class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -278,16 +282,45 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
-sealed trait KeyStateEncoderSpec
+sealed trait KeyStateEncoderSpec {
+  def jsonValue: JValue
+  def json: String = compact(render(jsonValue))
+}
 
-case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends 
KeyStateEncoderSpec
+object KeyStateEncoderSpec {
+  def fromJson(keySchema: StructType, m: Map[String, Any]): 
KeyStateEncoderSpec = {
+    // match on type
+    m("keyStateEncoderType").asInstanceOf[String] match {

Review Comment:
   IMO, except the first routing, instantiation of the spec from json must 
happen in its own implementation. (companion object, for example) This way we 
can couple with the serialization and deserialization together.
   
   But I get that it will put more boilerplate code, I'll leave this as an area 
of preference.
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(

Review Comment:
   What is the version policy for State schema file and ColumnFamilySchema?
   
   Do we expect bumping the version for state schema file when we need to bump 
the version for ColumnFamilySchema? Or will we consider this separately and 
provide separate versioning for ColumnFamilySchema? If we expect former, please 
explicitly call this out. If we expect latter, please put the version 
information for ColumnFamilySchema into the file, so that we can read based on 
its own versioning.
   
   Also, this is certainly diverged with previous versions. Is there any chance 
for us to generalize/normalize? E.g. read v1/v2 from file and instantiate v3, 
and vice versa (store scoped v3 - only default cf - to file as v1/v2). We would 
eventually want to see this to be combined rather than scattered.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getListStateSchema[T](

Review Comment:
   nit: could be one liner?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -187,23 +189,29 @@ class IncrementalExecution(
     }
   }
 
-  object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
-    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
-      case stateStoreWriter: StateStoreWriter if isFirstBatch =>
-        val metadata = stateStoreWriter.operatorStateMetadata()
-        val metadataWriter = new OperatorStateMetadataWriter(new Path(
-          checkpointLocation, 
stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
-        metadataWriter.write(metadata)
-        stateStoreWriter
-    }
-  }
-
   // Planning rule used to record the state schema for the first run and 
validate state schema
   // changes across query runs.
-  object StateSchemaValidationRule extends SparkPlanPartialRule {
+  object StateSchemaAndOperatorMetadataRule extends SparkPlanPartialRule {
     override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      // In the case of TransformWithStateExec, we want to collect this 
StateSchema
+      // filepath, and write this path out in the OperatorStateMetadata file
       case statefulOp: StatefulOperator if isFirstBatch =>
-        statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf)
+        val stateSchemaVersion = statefulOp match {
+          case _: TransformWithStateExec => sparkSession.sessionState.conf.
+            
getConf(SQLConf.STREAMING_TRANSFORM_WITH_STATE_OP_STATE_SCHEMA_VERSION)
+          case _ => STATE_SCHEMA_DEFAULT_VERSION
+        }
+        val stateSchemaPaths = statefulOp.
+          validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, 
stateSchemaVersion)
+        // write out the state schema paths to the metadata file
+        statefulOp match {
+          case stateStoreWriter: StateStoreWriter =>
+            val metadata = stateStoreWriter.operatorStateMetadata()
+            // TODO: Populate metadata with stateSchemaPaths if metadata 
version is v2

Review Comment:
   Let's be sure to file a JIRA ticket and put the ticket number here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(
+    hadoopConf: Configuration,
+    path: String) {
+
+  val VERSION = 3
+
+  val metadataPath = new Path(path)
+
+  protected val fileManager: CheckpointFileManager =
+    CheckpointFileManager.create(metadataPath, hadoopConf)
+
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
+  }
+
+  def deserialize(in: InputStream): List[ColumnFamilySchema] = {
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+    }
+
+    val version = lines.next().trim
+    validateVersion(version, VERSION)
+
+    lines.map(ColumnFamilySchemaV1.fromJson).toList
+  }
+
+  def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = {
+    out.write(s"v${VERSION}".getBytes(UTF_8))
+    out.write('\n')
+    out.write(schemas.map(_.json).mkString("\n").getBytes(UTF_8))
+  }
+
+  def addWithUUID(batchId: Long, metadata: List[ColumnFamilySchema]): Path = {
+    val batchMetadataPath = batchIdToPath(batchId)
+    val schemaFilePath = new Path(batchMetadataPath, 
UUID.randomUUID().toString)
+    fileManager.mkdirs(batchMetadataPath)
+    write(schemaFilePath, out => serialize(metadata, out))
+    schemaFilePath
+  }
+
+  def getWithPath(schemaFilePath: Path): List[ColumnFamilySchema] = {
+    deserialize(fileManager.open(schemaFilePath))
+  }
+
+  protected def write(
+      batchMetadataFile: Path,
+      fn: OutputStream => Unit): Unit = {
+    // Only write metadata when the batch has not yet been written

Review Comment:
   We don't expect this to happen, given the addition of UUID. Even for the 
multiple retrials of the same batch, this will always create new file per trial.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -201,5 +203,6 @@ object StateSchemaCompatibilityChecker {
     if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
       throw result.get
     }
+    Array(checker.schemaFileLocation.toString)

Review Comment:
   I guess this won't be used for v3 as the schema file location is meant to be 
UUID postfixed. Do I understand correctly?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(
+    hadoopConf: Configuration,
+    path: String) {
+
+  val VERSION = 3
+
+  val metadataPath = new Path(path)
+
+  protected val fileManager: CheckpointFileManager =
+    CheckpointFileManager.create(metadataPath, hadoopConf)
+
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
+  }
+
+  def deserialize(in: InputStream): List[ColumnFamilySchema] = {
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+    }
+
+    val version = lines.next().trim
+    validateVersion(version, VERSION)
+
+    lines.map(ColumnFamilySchemaV1.fromJson).toList
+  }
+
+  def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = {
+    out.write(s"v${VERSION}".getBytes(UTF_8))
+    out.write('\n')
+    out.write(schemas.map(_.json).mkString("\n").getBytes(UTF_8))
+  }
+
+  def addWithUUID(batchId: Long, metadata: List[ColumnFamilySchema]): Path = {
+    val batchMetadataPath = batchIdToPath(batchId)

Review Comment:
   Can we clarify why we need additional directory per each write? Have we 
considered about simply using files in the same directory? The pointers for 
valid schema files are available to checkpoint anyway, so I'm yet to understand 
what's the benefit of doing this - this adds more cost on deletion (if the 
storage requires us to delete the directory separately).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.UUID
+
+import scala.io.{Source => IOSource}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion
+
+/**
+ * The StateSchemaV3File is used to write the schema of multiple column 
families.
+ * Right now, this is primarily used for the TransformWithState operator, 
which supports
+ * multiple column families to keep the data for multiple state variables.
+ * @param hadoopConf Hadoop configuration that is used to read / write 
metadata files.
+ * @param path Path to the directory that will be used for writing metadata.
+ */
+class StateSchemaV3File(
+    hadoopConf: Configuration,
+    path: String) {
+
+  val VERSION = 3

Review Comment:
   Best practice for constants is to place them in the companion object.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -85,6 +85,8 @@ class IncrementalExecution(
     .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
     .getOrElse(sparkSession.sessionState.conf.numShufflePartitions)
 
+  private val STATE_SCHEMA_DEFAULT_VERSION: Int = 2

Review Comment:
   Let's put the code comment for what this means and what operators are 
applied vs not applied. (For this case, can simply say `except 
transformWithState`)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**
+   * Function to add the ValueState schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ValueStateWithTTL schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @param ttlConfig  - the ttl configuration (time to live duration etc.)
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, true)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ListState schema to the list of column family schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ListState of type T that can be used to store state 
persistently
+   */
+  override def getListState[T](stateName: String, valEncoder: Encoder[T]): 
ListState[T] = {
+    verifyStateVarOperations("get_list_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getListStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ListState[T]]
+  }
+
+  /**
+   * Function to add the ListStateWithTTL schema to the list of column family 
schemas.

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala:
##########
@@ -28,6 +33,57 @@ import org.apache.spark.util.Utils
 /**
  * Helper classes for reading/writing state schema.
  */
+sealed trait ColumnFamilySchema extends Serializable {
+  def jsonValue: JValue
+
+  def json: String
+
+  def columnFamilyName: String
+}
+
+case class ColumnFamilySchemaV1(
+    columnFamilyName: String,
+    keySchema: StructType,
+    valueSchema: StructType,
+    keyStateEncoderSpec: KeyStateEncoderSpec,
+    userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema {

Review Comment:
   I feel like userKeyEncoder is too coupled with transformWithState, but I 
couldn't imagine we would have more complicated case of stateful operator 
implementation, so sounds like OK to me.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+
+trait ColumnFamilySchemaUtils {
+  def getValueStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getListStateSchema[T](
+      stateName: String,
+      hasTtl: Boolean): ColumnFamilySchema
+
+
+  def getMapStateSchema[K, V](
+      stateName: String,
+      userKeyEnc: Encoder[K],
+      hasTtl: Boolean): ColumnFamilySchema
+}
+
+object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils {
+
+  def getValueStateSchema[T](

Review Comment:
   Will we have a corresponding metadata information for the operator? Here the 
schema information does not cover how state data source reader can read the 
state back. I assume we will have another code change for supporting state data 
source reader, but just wanted to double check.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**
+   * Function to add the ValueState schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ValueStateWithTTL schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @param ttlConfig  - the ttl configuration (time to live duration etc.)
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, true)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ListState schema to the list of column family schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ListState of type T that can be used to store state 
persistently
+   */
+  override def getListState[T](stateName: String, valEncoder: Encoder[T]): 
ListState[T] = {
+    verifyStateVarOperations("get_list_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getListStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ListState[T]]
+  }
+
+  /**
+   * Function to add the ListStateWithTTL schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @param ttlConfig  - the ttl configuration (time to live duration etc.)
+   * @tparam T - type of state variable
+   * @return - instance of ListState of type T that can be used to store state 
persistently
+   */
+  override def getListState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ListState[T] = {
+    verifyStateVarOperations("get_list_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getListStateSchema(stateName, true)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ListState[T]]
+  }
+
+  /**
+   * Function to add the MapState schema to the list of column family schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   * @param stateName  - name of the state variable
+   * @param userKeyEnc - spark sql encoder for the map key
+   * @param valEncoder - spark sql encoder for the map value
+   * @tparam K - type of key for map state variable
+   * @tparam V - type of value for map state variable
+   * @return - instance of MapState of type [K,V] that can be used to store 
state persistently
+   */
+  override def getMapState[K, V](
+      stateName: String,
+      userKeyEnc: Encoder[K],
+      valEncoder: Encoder[V]): MapState[K, V] = {
+    verifyStateVarOperations("get_map_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getMapStateSchema(stateName, userKeyEnc, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[MapState[K, V]]
+  }
+
+  /**
+   * Function to add the MapStateWithTTL schema to the list of column family 
schemas.

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**
+   * Function to add the ValueState schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, false)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ValueStateWithTTL schema to the list of column family 
schemas.
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   *
+   * @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
+   * @param ttlConfig  - the ttl configuration (time to live duration etc.)
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  override def getValueState[T](
+      stateName: String,
+      valEncoder: Encoder[T],
+      ttlConfig: TTLConfig): ValueState[T] = {
+    verifyStateVarOperations("get_value_state", PRE_INIT)
+    val colFamilySchema = columnFamilySchemaUtils.
+      getValueStateSchema(stateName, true)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    null.asInstanceOf[ValueState[T]]
+  }
+
+  /**
+   * Function to add the ListState schema to the list of column family schemas.

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -340,11 +370,47 @@ case class TransformWithStateExec(
     )
   }
 
-  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
-    // TODO: transformWithState is special because we don't have the schema of 
the state directly
-    // within the passed args. We need to gather this after running the init 
function
-    // within the stateful processor on the driver. This also requires a 
schema format change
-    // when recording this information persistently.
+  override def validateAndMaybeEvolveStateSchema(
+      hadoopConf: Configuration,
+      batchId: Long,
+      stateSchemaVersion: Int):
+    Array[String] = {
+    assert(stateSchemaVersion >= 3)
+    val newColumnFamilySchemas = getColFamilySchemas()
+    val schemaFile = new StateSchemaV3File(
+      hadoopConf, 
stateSchemaFilePath(StateStoreId.DEFAULT_STORE_NAME).toString)
+    // TODO: Read the schema path from the OperatorStateMetadata file
+    // and validate it with the new schema
+
+    // Write the new schema to the schema file
+    val schemaPath = schemaFile.addWithUUID(batchId, 
newColumnFamilySchemas.values.toList)
+    Array(schemaPath.toString)
+  }
+
+  private def validateSchemas(
+      oldSchemas: List[ColumnFamilySchema],
+      newSchemas: Map[String, ColumnFamilySchema]): Unit = {
+    oldSchemas.foreach { case oldSchema: ColumnFamilySchemaV1 =>
+      newSchemas.get(oldSchema.columnFamilyName).foreach {
+        case newSchema: ColumnFamilySchemaV1 =>
+          StateSchemaCompatibilityChecker.check(
+            (oldSchema.keySchema, oldSchema.valueSchema),
+            (newSchema.keySchema, newSchema.valueSchema),
+            ignoreValueSchema = false
+          )
+      }
+    }
+  }
+
+  private def stateSchemaFilePath(storeName: String): Path = {

Review Comment:
   nit: `Dir` path to have less confusion.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########


Review Comment:
   Can we have round-trip serde tests for all specs <-> json? Here we have only 
tests leveraging single spec and I don't think it's sufficient. You can put 
them to the separate test suite to avoid adding further complexity of test into 
here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl(
     }
   }
 }
+
+/**
+ * This DriverStatefulProcessorHandleImpl is used within TransformWithExec
+ * on the driver side to collect the columnFamilySchemas before any processing 
is
+ * actually done. We need this class because we can only collect the schemas 
after
+ * the StatefulProcessor is initialized.
+ */
+class DriverStatefulProcessorHandleImpl(timeMode: TimeMode)
+  extends StatefulProcessorHandleImplBase(timeMode) {
+
+  private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1
+
+  // Because this is only happening on the driver side, there is only
+  // one task modifying and accessing this map at a time
+  private[sql] val columnFamilySchemas: mutable.Map[String, 
ColumnFamilySchema] =
+    new mutable.HashMap[String, ColumnFamilySchema]()
+
+  def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = 
columnFamilySchemas.toMap
+
+  /**

Review Comment:
   Btw, I'm not sure users will try to do something except assigning the 
returning value to the field, but if they are ever doing something, this will 
lead to NPE.
   
   Probably need to document this in the method doc of base class? cc. 
@anishshri-db 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -92,6 +93,35 @@ case class TransformWithStateExec(
     }
   }
 
+  /**
+   * We initialize this processor handle in the driver to run the init function
+   * and fetch the schemas of the state variables initialized in this 
processor.
+   * @return a new instance of the driver processor handle
+   */
+  private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = {
+    val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(timeMode)
+    driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT)
+    statefulProcessor.setHandle(driverProcessorHandle)
+    statefulProcessor.init(outputMode, timeMode)
+    driverProcessorHandle
+  }
+
+  /**
+   * Fetching the columnFamilySchemas from the StatefulProcessorHandle
+   * after init is called.
+   */
+  private def getColFamilySchemas(): Map[String, ColumnFamilySchema] = {
+    val driverProcessorHandle = getDriverProcessorHandle
+    val columnFamilySchemas = driverProcessorHandle.getColumnFamilySchemas
+    closeProcessorHandle()
+    columnFamilySchemas
+  }
+
+  private def closeProcessorHandle(): Unit = {
+    statefulProcessor.close()

Review Comment:
   It's not only closing the temporal processor handle, right? Will the 
statefulProcessor instance be ever reused (in driver, and executor with 
serialized), or is it clearly isolated? Probably better to make clear with this 
so that future reader won't be confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to