HeartSaVioR commented on code in PR #47574:
URL: https://github.com/apache/spark/pull/47574#discussion_r1733577154


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -52,30 +54,136 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
 
   override def shortName(): String = "statestore"
 
+  private var stateStoreMetadata: Option[Array[StateMetadataTableEntry]] = None

Review Comment:
   I'm not sure making this instance as stateful is safe. I got to check some 
implementations and all of them are stateless.
   
   @cloud-fan Do you happen to know about the safeness of this? This isn't 
quite clear from the interface contract in TableProvider and DataSourceRegister.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -52,30 +54,136 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
 
   override def shortName(): String = "statestore"
 
+  private var stateStoreMetadata: Option[Array[StateMetadataTableEntry]] = None
+
+  private var keyStateEncoderSpecOpt: Option[KeyStateEncoderSpec] = None
+
+  private var transformWithStateVariableInfoOpt: 
Option[TransformWithStateVariableInfo] = None
+
+  private var stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema] 
= None
+
+  private def runStateVarChecks(

Review Comment:
   I admit this is style related and an arguable topic, but shall we move the 
private methods below public methods? Now the most important methods in the 
class (getTable, inferSchema) are in between private methods and be shown as 
just one of various methods. If the method scope is properly narrowed down 
(tight), the scope fits to the importance of the method most of the time.
   
   Even we don't blindly follow the well-known practice of public -> protected 
-> private sequence and want to follow another well-known practice of "grouping 
related methods", I prefer to see the sequence of public -> protected -> 
private for grouped methods.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -91,25 +199,45 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
           val storeId = new StateStoreId(stateCheckpointLocation.toString, 
sourceOptions.operatorId,
             partitionId, sourceOptions.storeName)
           val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
-          val manager = new StateSchemaCompatibilityChecker(providerId, 
hadoopConf)
-          val stateSchema = manager.readSchemaFile().head
-          (stateSchema.keySchema, stateSchema.valueSchema)
-      }
-
-      if (sourceOptions.readChangeFeed) {
-        new StructType()
-          .add("batch_id", LongType)
-          .add("change_type", StringType)
-          .add("key", keySchema)
-          .add("value", valueSchema)
-          .add("partition_id", IntegerType)
-      } else {
-        new StructType()
-          .add("key", keySchema)
-          .add("value", valueSchema)
-          .add("partition_id", IntegerType)
+          val storeMetadata = stateStoreMetadata.get
+
+          val stateVarName = sourceOptions.stateVarName
+            .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
+
+          // Read the schema file path from operator metadata version v2 
onwards

Review Comment:
   Let's not make assumption that operator metadata v2 must be always 
transformWithState. We should only do this when the referred operator is 
transformWithState.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -17,9 +17,16 @@
 package org.apache.spark.sql.execution.datasources.v2.state.utils
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
+import 
org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceErrors, 
StateSourceOptions}
+import org.apache.spark.sql.execution.streaming.{StateVariableType, 
TransformWithStateVariableInfo}
+import org.apache.spark.sql.execution.streaming.state.StateStoreColFamilySchema
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType, 
StringType, StructType}
+import org.apache.spark.util.ArrayImplicits._
 
 object SchemaUtil {
+

Review Comment:
   nit: not necessary?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -59,40 +64,44 @@ abstract class StatePartitionReaderBase(
     hadoopConf: SerializableConfiguration,
     partition: StateStoreInputPartition,
     schema: StructType,
-    stateStoreMetadata: Array[StateMetadataTableEntry])
+    keyStateEncoderSpec: KeyStateEncoderSpec,
+    stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
+    stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
   extends PartitionReader[InternalRow] with Logging {
-  private val keySchema = SchemaUtil.getSchemaAsDataType(schema, 
"key").asInstanceOf[StructType]
-  private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, 
"value").asInstanceOf[StructType]
+  protected val keySchema = SchemaUtil.getSchemaAsDataType(
+    schema, "key").asInstanceOf[StructType]
+  protected val valueSchema = SchemaUtil.getSchemaAsDataType(
+    schema, "value").asInstanceOf[StructType]
 
   protected lazy val provider: StateStoreProvider = {
     val stateStoreId = 
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
       partition.sourceOptions.operatorId, partition.partition, 
partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
-    val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
-      logWarning("Metadata for state store not found, possible cause is this 
checkpoint " +
-        "is created by older version of spark. If the query has session window 
aggregation, " +
-        "the state can't be read correctly and runtime exception will be 
thrown. " +
-        "Run the streaming query in newer spark version to generate state 
metadata " +
-        "can fix the issue.")
-      0
-    } else {
-      require(stateStoreMetadata.length == 1)
-      stateStoreMetadata.head.numColsPrefixKey
-    }
 
-    // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support 
for this will be
-    // added in the future along with state metadata changes.
-    // Filed JIRA here: https://issues.apache.org/jira/browse/SPARK-47524
-    val keyStateEncoderType = if (numColsPrefixKey > 0) {
-      PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+    val useColFamilies = if (stateVariableInfoOpt.isDefined) {
+      true
     } else {
-      NoPrefixKeyStateEncoderSpec(keySchema)
+      false
     }
 
-    StateStoreProvider.createAndInit(
-      stateStoreProviderId, keySchema, valueSchema, keyStateEncoderType,
-      useColumnFamilies = false, storeConf, hadoopConf.value,
+    val provider = StateStoreProvider.createAndInit(
+      stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+      useColumnFamilies = useColFamilies, storeConf, hadoopConf.value,
       useMultipleValuesPerKey = false)
+
+    if (useColFamilies) {
+      val store = provider.getStore(partition.sourceOptions.batchId + 1)
+      require(stateStoreColFamilySchemaOpt.isDefined)
+      val stateStoreColFamilySchema = stateStoreColFamilySchemaOpt.get

Review Comment:
   I think it's redundant to create a column family as we will be able to 
restore all available column families during the load. Is this to consider the 
bad case of "CF does not exist" to the same with "no data"?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -91,25 +199,45 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
           val storeId = new StateStoreId(stateCheckpointLocation.toString, 
sourceOptions.operatorId,
             partitionId, sourceOptions.storeName)
           val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
-          val manager = new StateSchemaCompatibilityChecker(providerId, 
hadoopConf)
-          val stateSchema = manager.readSchemaFile().head
-          (stateSchema.keySchema, stateSchema.valueSchema)
-      }
-
-      if (sourceOptions.readChangeFeed) {
-        new StructType()
-          .add("batch_id", LongType)
-          .add("change_type", StringType)
-          .add("key", keySchema)
-          .add("value", valueSchema)
-          .add("partition_id", IntegerType)
-      } else {
-        new StructType()
-          .add("key", keySchema)
-          .add("value", valueSchema)
-          .add("partition_id", IntegerType)
+          val storeMetadata = stateStoreMetadata.get
+
+          val stateVarName = sourceOptions.stateVarName
+            .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
+
+          // Read the schema file path from operator metadata version v2 
onwards
+          val oldSchemaFilePath = if (storeMetadata.length > 0 && 
storeMetadata.head.version == 2) {

Review Comment:
   Btw, that said, I think it's lot less confusing to just set all stateful 
variables from getStoreMetadataAndRunChecks().
   
   Now there are multiple places dealing with storeMetadata to deduce further 
information like transformWithStateVariableInfoOpt (there are redundant codes), 
and they even have some dependencies (e.g. This logic seems to actually depend 
on the validation part of getStoreMetadataAndRunChecks()).
   
   We'll need to make a major refactor if we have to make this be stateless. 
Why not having a "single" data class to contain all stateful variables into 
one? If we got to know that this has to be stateless, we can return the 
instance of data class from getStoreMetadataAndRunChecks() and eliminate the 
variable.
   
   It requires us to have redundant overhead twice (from inferSchema() and 
getTable()), but if this needs to be stateless, then it's unavoidable. If this 
can be still stateful, we only pay the additional cost for reading schema file 
in getTable(), which can even be removed in finer grained tune (don't feel it's 
worth to do).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -30,4 +37,122 @@ object SchemaUtil {
           "schema" -> schema.toString()))
     }
   }
+
+  private def generateSchemaForStateVar(

Review Comment:
   ditto about the placement of private method



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -52,30 +54,136 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
 
   override def shortName(): String = "statestore"
 
+  private var stateStoreMetadata: Option[Array[StateMetadataTableEntry]] = None
+
+  private var keyStateEncoderSpecOpt: Option[KeyStateEncoderSpec] = None
+
+  private var transformWithStateVariableInfoOpt: 
Option[TransformWithStateVariableInfo] = None
+
+  private var stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema] 
= None
+
+  private def runStateVarChecks(
+      sourceOptions: StateSourceOptions,
+      stateStoreMetadata: Array[StateMetadataTableEntry]): Unit = {
+    val twsShortName = "transformWithStateExec"
+    if (sourceOptions.stateVarName.isDefined) {
+      // Perform checks for transformWithState operator in case state variable 
name is provided
+      require(stateStoreMetadata.size == 1)
+      val opMetadata = stateStoreMetadata.head
+      if (opMetadata.operatorName != twsShortName) {
+        // if we are trying to query state source with state variable name, 
then the operator
+        // should be transformWithState
+        val errorMsg = "Providing state variable names is only supported with 
the " +
+          s"transformWithState operator. Found 
operator=${opMetadata.operatorName}. " +
+          s"Please remove this option and re-run the query."
+        throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME, 
errorMsg)
+      }
+
+      // if the operator is transformWithState, but the operator properties 
are empty, then
+      // the user has not defined any state variables for the operator
+      val operatorProperties = opMetadata.operatorPropertiesJson
+      if (operatorProperties.isEmpty) {
+        throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
+          "No state variable names are defined for the transformWithState 
operator")
+      }
+
+      // if the state variable is not one of the defined/available state 
variables, then we
+      // fail the query
+      val stateVarName = sourceOptions.stateVarName.get
+      val twsOperatorProperties = 
TransformWithStateOperatorProperties.fromJson(operatorProperties)
+      val stateVars = twsOperatorProperties.stateVariables
+      if (stateVars.filter(stateVar => stateVar.stateName == 
stateVarName).size != 1) {
+        throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
+          s"State variable $stateVarName is not defined for the 
transformWithState operator.")
+      }
+    } else {
+      // if the operator is transformWithState, then a state variable argument 
is mandatory
+      if (stateStoreMetadata.size == 1 &&
+        stateStoreMetadata.head.operatorName == twsShortName) {
+        throw StateDataSourceErrors.requiredOptionUnspecified("stateVarName")
+      }
+    }
+  }
+
+  private def getStateStoreMetadata(stateSourceOptions: StateSourceOptions):
+    Array[StateMetadataTableEntry] = {
+    val allStateStoreMetadata = new StateMetadataPartitionReader(
+      stateSourceOptions.stateCheckpointLocation.getParent.toString,
+      serializedHadoopConf, stateSourceOptions.batchId).stateMetadata.toArray
+    val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
+      entry.operatorId == stateSourceOptions.operatorId &&
+        entry.stateStoreName == stateSourceOptions.storeName
+    }
+    stateStoreMetadata
+  }
+
+  private def getStoreMetadataAndRunChecks(stateSourceOptions: 
StateSourceOptions): Unit = {
+    if (stateStoreMetadata.isEmpty) {
+      stateStoreMetadata = Some(getStateStoreMetadata(stateSourceOptions))
+      runStateVarChecks(stateSourceOptions, stateStoreMetadata.get)
+    }
+  }
+
   override def getTable(
       schema: StructType,
       partitioning: Array[Transform],
       properties: util.Map[String, String]): Table = {
     val sourceOptions = StateSourceOptions.apply(session, hadoopConf, 
properties)
     val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, 
sourceOptions.batchId)
-    // Read the operator metadata once to see if we can find the information 
for prefix scan
-    // encoder used in session window aggregation queries.
-    val allStateStoreMetadata = new StateMetadataPartitionReader(
-      sourceOptions.stateCheckpointLocation.getParent.toString, 
serializedHadoopConf,
-      sourceOptions.batchId)
-      .stateMetadata.toArray
-    val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
-      entry.operatorId == sourceOptions.operatorId &&
-        entry.stateStoreName == sourceOptions.storeName
+    getStoreMetadataAndRunChecks(sourceOptions)
+
+    // The key state encoder spec should be available for all operators except 
stream-stream joins
+    val keyStateEncoderSpec = if (keyStateEncoderSpecOpt.isDefined) {

Review Comment:
   This looks like depending on the sequence of method calls - this might break 
if inferSchema() is not called in prior and this doesn't distinguish the case 
between `inferSchema() isn't called` and `inferSchema sets the variable to None 
or skips setting the variable`.
   
   If we want to depend on the sequence, let's have a flag to assert it. We can 
throw an exception to block users specifying schema directly (which leads 
inferSchema() to not be called).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -30,4 +37,122 @@ object SchemaUtil {
           "schema" -> schema.toString()))
     }
   }
+
+  private def generateSchemaForStateVar(
+      stateVarInfo: TransformWithStateVariableInfo,
+      stateStoreColFamilySchema: StateStoreColFamilySchema): StructType = {
+    val stateVarType = stateVarInfo.stateVariableType
+    val hasTTLEnabled = stateVarInfo.ttlEnabled
+
+    stateVarType match {
+      case StateVariableType.ValueState =>
+        if (hasTTLEnabled) {
+          val ttlValueSchema = SchemaUtil.getSchemaAsDataType(
+            stateStoreColFamilySchema.valueSchema, 
"value").asInstanceOf[StructType]
+          new StructType()
+            .add("key", stateStoreColFamilySchema.keySchema)
+            .add("value", ttlValueSchema)
+            .add("expiration_timestamp", LongType)
+            .add("partition_id", IntegerType)
+        } else {
+          new StructType()
+            .add("key", stateStoreColFamilySchema.keySchema)
+            .add("value", stateStoreColFamilySchema.valueSchema)
+            .add("partition_id", IntegerType)
+        }
+
+      case _ =>
+        throw StateDataSourceErrors.internalError(s"Unsupported state variable 
type $stateVarType")
+    }
+  }
+
+  def getSourceSchema(
+      sourceOptions: StateSourceOptions,
+      keySchema: StructType,
+      valueSchema: StructType,
+      transformWithStateVariableInfoOpt: 
Option[TransformWithStateVariableInfo],
+      stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]): 
StructType = {
+    if (sourceOptions.readChangeFeed) {

Review Comment:
   Are "read change feed" and "read from transformWithState" mutually 
exclusive? If then, have we blocked the case of specifying both options?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -146,21 +158,40 @@ class StatePartitionReader(
   }
 
   override lazy val iter: Iterator[InternalRow] = {
-    store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
+    val stateVarName = stateVariableInfoOpt
+      .map(_.stateName).getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
+    store
+      .iterator(stateVarName)
+      .map(pair =>

Review Comment:
   nit: `{ pair =>`. Let's use `{` for multi-lines which is more consistent 
with existing code.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -74,7 +74,7 @@ class RunningCountStatefulProcessorWithTTL
       outputMode: OutputMode,
       timeMode: TimeMode): Unit = {
     _countState = getHandle.getValueState[Long]("countState",
-      Encoders.scalaLong, TTLConfig(Duration.ofMillis(1000)))
+      Encoders.scalaLong, TTLConfig(Duration.ofMillis(30000)))

Review Comment:
   It's a huge change of the value of TTL - if the existing test does not fail 
with this change, it sounds to me as the existing test is too loose.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -205,26 +207,35 @@ class StateMetadataPartitionReader(
 
   // Need this to be accessible from IncrementalExecution for the planning 
rule.
   private[sql] def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
-    val stateDir = new Path(checkpointLocation, "state")
-    val opIds = fileManager
-      .list(stateDir, pathNameCanBeParsedAsLongFilter).map(f => 
pathToLong(f.getPath)).sorted
-    opIds.map { opId =>
-      val operatorIdPath = new Path(stateDir, opId.toString)
-      // check if OperatorStateMetadataV2 path exists, if it does, read it
-      // otherwise, fall back to OperatorStateMetadataV1
-      val operatorStateMetadataV2Path = 
OperatorStateMetadataV2.metadataDirPath(operatorIdPath)
-      val operatorStateMetadataVersion = if 
(fileManager.exists(operatorStateMetadataV2Path)) {
-        2
-      } else {
-        1
-      }
-
-      OperatorStateMetadataReader.createReader(
-        operatorIdPath, hadoopConf, operatorStateMetadataVersion, 
batchId).read() match {
-        case Some(metadata) => metadata
-        case None => throw 
StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
-          batchId)
+    try {
+      val stateDir = new Path(checkpointLocation, "state")
+      val opIds = fileManager
+        .list(stateDir, pathNameCanBeParsedAsLongFilter).map(f => 
pathToLong(f.getPath)).sorted
+      opIds.map { opId =>
+        val operatorIdPath = new Path(stateDir, opId.toString)
+        // check if OperatorStateMetadataV2 path exists, if it does, read it
+        // otherwise, fall back to OperatorStateMetadataV1
+        val operatorStateMetadataV2Path = 
OperatorStateMetadataV2.metadataDirPath(operatorIdPath)
+        val operatorStateMetadataVersion = if 
(fileManager.exists(operatorStateMetadataV2Path)) {
+          2
+        } else {
+          1
+        }
+        OperatorStateMetadataReader.createReader(
+          operatorIdPath, hadoopConf, operatorStateMetadataVersion, 
batchId).read() match {
+          case Some(metadata) => metadata
+          case None => throw 
StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
+            batchId)
+        }
       }
+    } catch {
+      // if the operator metadata is not present, catch the exception

Review Comment:
   What's the intention of this change? Does this make any impact on planning 
phase of IncrementalExecution?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -30,4 +37,122 @@ object SchemaUtil {
           "schema" -> schema.toString()))
     }
   }
+
+  private def generateSchemaForStateVar(
+      stateVarInfo: TransformWithStateVariableInfo,
+      stateStoreColFamilySchema: StateStoreColFamilySchema): StructType = {
+    val stateVarType = stateVarInfo.stateVariableType
+    val hasTTLEnabled = stateVarInfo.ttlEnabled
+
+    stateVarType match {
+      case StateVariableType.ValueState =>
+        if (hasTTLEnabled) {
+          val ttlValueSchema = SchemaUtil.getSchemaAsDataType(
+            stateStoreColFamilySchema.valueSchema, 
"value").asInstanceOf[StructType]
+          new StructType()
+            .add("key", stateStoreColFamilySchema.keySchema)
+            .add("value", ttlValueSchema)
+            .add("expiration_timestamp", LongType)
+            .add("partition_id", IntegerType)
+        } else {
+          new StructType()
+            .add("key", stateStoreColFamilySchema.keySchema)
+            .add("value", stateStoreColFamilySchema.valueSchema)
+            .add("partition_id", IntegerType)
+        }
+
+      case _ =>
+        throw StateDataSourceErrors.internalError(s"Unsupported state variable 
type $stateVarType")
+    }
+  }
+
+  def getSourceSchema(
+      sourceOptions: StateSourceOptions,
+      keySchema: StructType,
+      valueSchema: StructType,
+      transformWithStateVariableInfoOpt: 
Option[TransformWithStateVariableInfo],
+      stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]): 
StructType = {
+    if (sourceOptions.readChangeFeed) {
+      new StructType()
+        .add("batch_id", LongType)
+        .add("change_type", StringType)
+        .add("key", keySchema)
+        .add("value", valueSchema)
+        .add("partition_id", IntegerType)
+    } else if (transformWithStateVariableInfoOpt.isDefined) {
+      require(stateStoreColFamilySchemaOpt.isDefined)
+      generateSchemaForStateVar(transformWithStateVariableInfoOpt.get,
+        stateStoreColFamilySchemaOpt.get)
+    } else {
+      new StructType()
+        .add("key", keySchema)
+        .add("value", valueSchema)
+        .add("partition_id", IntegerType)
+    }
+  }
+
+  def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow), partition: Int): 
InternalRow = {
+    val row = new GenericInternalRow(3)
+    row.update(0, pair._1)
+    row.update(1, pair._2)
+    row.update(2, partition)
+    row
+  }
+
+  def unifyStateRowPairWithTTL(
+      pair: (UnsafeRow, UnsafeRow),
+      valueSchema: StructType,
+      partition: Int): InternalRow = {
+    val row = new GenericInternalRow(4)
+    row.update(0, pair._1)
+    row.update(1, pair._2.get(0, valueSchema))
+    row.update(2, pair._2.get(1, LongType))
+    row.update(3, partition)
+    row
+  }
+
+  private val expectedTypes = Map(

Review Comment:
   nit: inline or explicitly follow the style guidance of constant. 
EXPECTED_COLUMN_TYPES or so.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -268,6 +268,25 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
           "message" -> s"value should be less than or equal to $endBatchId"))
     }
   }
+
+  test("ERROR: trying to specify state variable name with " +
+    s"non-transformWithState operator") {

Review Comment:
   nit: no need to have `s` before the string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to