HeartSaVioR commented on code in PR #47574:
URL: https://github.com/apache/spark/pull/47574#discussion_r1733577154
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -52,30 +54,136 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
override def shortName(): String = "statestore"
+ private var stateStoreMetadata: Option[Array[StateMetadataTableEntry]] = None
Review Comment:
I'm not sure making this instance as stateful is safe. I got to check some
implementations and all of them are stateless.
@cloud-fan Do you happen to know about the safeness of this? This isn't
quite clear from the interface contract in TableProvider and DataSourceRegister.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -52,30 +54,136 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
override def shortName(): String = "statestore"
+ private var stateStoreMetadata: Option[Array[StateMetadataTableEntry]] = None
+
+ private var keyStateEncoderSpecOpt: Option[KeyStateEncoderSpec] = None
+
+ private var transformWithStateVariableInfoOpt:
Option[TransformWithStateVariableInfo] = None
+
+ private var stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]
= None
+
+ private def runStateVarChecks(
Review Comment:
I admit this is style related and an arguable topic, but shall we move the
private methods below public methods? Now the most important methods in the
class (getTable, inferSchema) are in between private methods and be shown as
just one of various methods. If the method scope is properly narrowed down
(tight), the scope fits to the importance of the method most of the time.
Even we don't blindly follow the well-known practice of public -> protected
-> private sequence and want to follow another well-known practice of "grouping
related methods", I prefer to see the sequence of public -> protected ->
private for grouped methods.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -91,25 +199,45 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
val storeId = new StateStoreId(stateCheckpointLocation.toString,
sourceOptions.operatorId,
partitionId, sourceOptions.storeName)
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
- val manager = new StateSchemaCompatibilityChecker(providerId,
hadoopConf)
- val stateSchema = manager.readSchemaFile().head
- (stateSchema.keySchema, stateSchema.valueSchema)
- }
-
- if (sourceOptions.readChangeFeed) {
- new StructType()
- .add("batch_id", LongType)
- .add("change_type", StringType)
- .add("key", keySchema)
- .add("value", valueSchema)
- .add("partition_id", IntegerType)
- } else {
- new StructType()
- .add("key", keySchema)
- .add("value", valueSchema)
- .add("partition_id", IntegerType)
+ val storeMetadata = stateStoreMetadata.get
+
+ val stateVarName = sourceOptions.stateVarName
+ .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
+
+ // Read the schema file path from operator metadata version v2
onwards
Review Comment:
Let's not make assumption that operator metadata v2 must be always
transformWithState. We should only do this when the referred operator is
transformWithState.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -17,9 +17,16 @@
package org.apache.spark.sql.execution.datasources.v2.state.utils
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeRow}
+import
org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceErrors,
StateSourceOptions}
+import org.apache.spark.sql.execution.streaming.{StateVariableType,
TransformWithStateVariableInfo}
+import org.apache.spark.sql.execution.streaming.state.StateStoreColFamilySchema
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType,
StringType, StructType}
+import org.apache.spark.util.ArrayImplicits._
object SchemaUtil {
+
Review Comment:
nit: not necessary?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -59,40 +64,44 @@ abstract class StatePartitionReaderBase(
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
- stateStoreMetadata: Array[StateMetadataTableEntry])
+ keyStateEncoderSpec: KeyStateEncoderSpec,
+ stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
+ stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
extends PartitionReader[InternalRow] with Logging {
- private val keySchema = SchemaUtil.getSchemaAsDataType(schema,
"key").asInstanceOf[StructType]
- private val valueSchema = SchemaUtil.getSchemaAsDataType(schema,
"value").asInstanceOf[StructType]
+ protected val keySchema = SchemaUtil.getSchemaAsDataType(
+ schema, "key").asInstanceOf[StructType]
+ protected val valueSchema = SchemaUtil.getSchemaAsDataType(
+ schema, "value").asInstanceOf[StructType]
protected lazy val provider: StateStoreProvider = {
val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
- val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
- logWarning("Metadata for state store not found, possible cause is this
checkpoint " +
- "is created by older version of spark. If the query has session window
aggregation, " +
- "the state can't be read correctly and runtime exception will be
thrown. " +
- "Run the streaming query in newer spark version to generate state
metadata " +
- "can fix the issue.")
- 0
- } else {
- require(stateStoreMetadata.length == 1)
- stateStoreMetadata.head.numColsPrefixKey
- }
- // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support
for this will be
- // added in the future along with state metadata changes.
- // Filed JIRA here: https://issues.apache.org/jira/browse/SPARK-47524
- val keyStateEncoderType = if (numColsPrefixKey > 0) {
- PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+ val useColFamilies = if (stateVariableInfoOpt.isDefined) {
+ true
} else {
- NoPrefixKeyStateEncoderSpec(keySchema)
+ false
}
- StateStoreProvider.createAndInit(
- stateStoreProviderId, keySchema, valueSchema, keyStateEncoderType,
- useColumnFamilies = false, storeConf, hadoopConf.value,
+ val provider = StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies = useColFamilies, storeConf, hadoopConf.value,
useMultipleValuesPerKey = false)
+
+ if (useColFamilies) {
+ val store = provider.getStore(partition.sourceOptions.batchId + 1)
+ require(stateStoreColFamilySchemaOpt.isDefined)
+ val stateStoreColFamilySchema = stateStoreColFamilySchemaOpt.get
Review Comment:
I think it's redundant to create a column family as we will be able to
restore all available column families during the load. Is this to consider the
bad case of "CF does not exist" to the same with "no data"?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -91,25 +199,45 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
val storeId = new StateStoreId(stateCheckpointLocation.toString,
sourceOptions.operatorId,
partitionId, sourceOptions.storeName)
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
- val manager = new StateSchemaCompatibilityChecker(providerId,
hadoopConf)
- val stateSchema = manager.readSchemaFile().head
- (stateSchema.keySchema, stateSchema.valueSchema)
- }
-
- if (sourceOptions.readChangeFeed) {
- new StructType()
- .add("batch_id", LongType)
- .add("change_type", StringType)
- .add("key", keySchema)
- .add("value", valueSchema)
- .add("partition_id", IntegerType)
- } else {
- new StructType()
- .add("key", keySchema)
- .add("value", valueSchema)
- .add("partition_id", IntegerType)
+ val storeMetadata = stateStoreMetadata.get
+
+ val stateVarName = sourceOptions.stateVarName
+ .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
+
+ // Read the schema file path from operator metadata version v2
onwards
+ val oldSchemaFilePath = if (storeMetadata.length > 0 &&
storeMetadata.head.version == 2) {
Review Comment:
Btw, that said, I think it's lot less confusing to just set all stateful
variables from getStoreMetadataAndRunChecks().
Now there are multiple places dealing with storeMetadata to deduce further
information like transformWithStateVariableInfoOpt (there are redundant codes),
and they even have some dependencies (e.g. This logic seems to actually depend
on the validation part of getStoreMetadataAndRunChecks()).
We'll need to make a major refactor if we have to make this be stateless.
Why not having a "single" data class to contain all stateful variables into
one? If we got to know that this has to be stateless, we can return the
instance of data class from getStoreMetadataAndRunChecks() and eliminate the
variable.
It requires us to have redundant overhead twice (from inferSchema() and
getTable()), but if this needs to be stateless, then it's unavoidable. If this
can be still stateful, we only pay the additional cost for reading schema file
in getTable(), which can even be removed in finer grained tune (don't feel it's
worth to do).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -30,4 +37,122 @@ object SchemaUtil {
"schema" -> schema.toString()))
}
}
+
+ private def generateSchemaForStateVar(
Review Comment:
ditto about the placement of private method
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -52,30 +54,136 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
override def shortName(): String = "statestore"
+ private var stateStoreMetadata: Option[Array[StateMetadataTableEntry]] = None
+
+ private var keyStateEncoderSpecOpt: Option[KeyStateEncoderSpec] = None
+
+ private var transformWithStateVariableInfoOpt:
Option[TransformWithStateVariableInfo] = None
+
+ private var stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]
= None
+
+ private def runStateVarChecks(
+ sourceOptions: StateSourceOptions,
+ stateStoreMetadata: Array[StateMetadataTableEntry]): Unit = {
+ val twsShortName = "transformWithStateExec"
+ if (sourceOptions.stateVarName.isDefined) {
+ // Perform checks for transformWithState operator in case state variable
name is provided
+ require(stateStoreMetadata.size == 1)
+ val opMetadata = stateStoreMetadata.head
+ if (opMetadata.operatorName != twsShortName) {
+ // if we are trying to query state source with state variable name,
then the operator
+ // should be transformWithState
+ val errorMsg = "Providing state variable names is only supported with
the " +
+ s"transformWithState operator. Found
operator=${opMetadata.operatorName}. " +
+ s"Please remove this option and re-run the query."
+ throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
errorMsg)
+ }
+
+ // if the operator is transformWithState, but the operator properties
are empty, then
+ // the user has not defined any state variables for the operator
+ val operatorProperties = opMetadata.operatorPropertiesJson
+ if (operatorProperties.isEmpty) {
+ throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
+ "No state variable names are defined for the transformWithState
operator")
+ }
+
+ // if the state variable is not one of the defined/available state
variables, then we
+ // fail the query
+ val stateVarName = sourceOptions.stateVarName.get
+ val twsOperatorProperties =
TransformWithStateOperatorProperties.fromJson(operatorProperties)
+ val stateVars = twsOperatorProperties.stateVariables
+ if (stateVars.filter(stateVar => stateVar.stateName ==
stateVarName).size != 1) {
+ throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
+ s"State variable $stateVarName is not defined for the
transformWithState operator.")
+ }
+ } else {
+ // if the operator is transformWithState, then a state variable argument
is mandatory
+ if (stateStoreMetadata.size == 1 &&
+ stateStoreMetadata.head.operatorName == twsShortName) {
+ throw StateDataSourceErrors.requiredOptionUnspecified("stateVarName")
+ }
+ }
+ }
+
+ private def getStateStoreMetadata(stateSourceOptions: StateSourceOptions):
+ Array[StateMetadataTableEntry] = {
+ val allStateStoreMetadata = new StateMetadataPartitionReader(
+ stateSourceOptions.stateCheckpointLocation.getParent.toString,
+ serializedHadoopConf, stateSourceOptions.batchId).stateMetadata.toArray
+ val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
+ entry.operatorId == stateSourceOptions.operatorId &&
+ entry.stateStoreName == stateSourceOptions.storeName
+ }
+ stateStoreMetadata
+ }
+
+ private def getStoreMetadataAndRunChecks(stateSourceOptions:
StateSourceOptions): Unit = {
+ if (stateStoreMetadata.isEmpty) {
+ stateStoreMetadata = Some(getStateStoreMetadata(stateSourceOptions))
+ runStateVarChecks(stateSourceOptions, stateStoreMetadata.get)
+ }
+ }
+
override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
val sourceOptions = StateSourceOptions.apply(session, hadoopConf,
properties)
val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation,
sourceOptions.batchId)
- // Read the operator metadata once to see if we can find the information
for prefix scan
- // encoder used in session window aggregation queries.
- val allStateStoreMetadata = new StateMetadataPartitionReader(
- sourceOptions.stateCheckpointLocation.getParent.toString,
serializedHadoopConf,
- sourceOptions.batchId)
- .stateMetadata.toArray
- val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
- entry.operatorId == sourceOptions.operatorId &&
- entry.stateStoreName == sourceOptions.storeName
+ getStoreMetadataAndRunChecks(sourceOptions)
+
+ // The key state encoder spec should be available for all operators except
stream-stream joins
+ val keyStateEncoderSpec = if (keyStateEncoderSpecOpt.isDefined) {
Review Comment:
This looks like depending on the sequence of method calls - this might break
if inferSchema() is not called in prior and this doesn't distinguish the case
between `inferSchema() isn't called` and `inferSchema sets the variable to None
or skips setting the variable`.
If we want to depend on the sequence, let's have a flag to assert it. We can
throw an exception to block users specifying schema directly (which leads
inferSchema() to not be called).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -30,4 +37,122 @@ object SchemaUtil {
"schema" -> schema.toString()))
}
}
+
+ private def generateSchemaForStateVar(
+ stateVarInfo: TransformWithStateVariableInfo,
+ stateStoreColFamilySchema: StateStoreColFamilySchema): StructType = {
+ val stateVarType = stateVarInfo.stateVariableType
+ val hasTTLEnabled = stateVarInfo.ttlEnabled
+
+ stateVarType match {
+ case StateVariableType.ValueState =>
+ if (hasTTLEnabled) {
+ val ttlValueSchema = SchemaUtil.getSchemaAsDataType(
+ stateStoreColFamilySchema.valueSchema,
"value").asInstanceOf[StructType]
+ new StructType()
+ .add("key", stateStoreColFamilySchema.keySchema)
+ .add("value", ttlValueSchema)
+ .add("expiration_timestamp", LongType)
+ .add("partition_id", IntegerType)
+ } else {
+ new StructType()
+ .add("key", stateStoreColFamilySchema.keySchema)
+ .add("value", stateStoreColFamilySchema.valueSchema)
+ .add("partition_id", IntegerType)
+ }
+
+ case _ =>
+ throw StateDataSourceErrors.internalError(s"Unsupported state variable
type $stateVarType")
+ }
+ }
+
+ def getSourceSchema(
+ sourceOptions: StateSourceOptions,
+ keySchema: StructType,
+ valueSchema: StructType,
+ transformWithStateVariableInfoOpt:
Option[TransformWithStateVariableInfo],
+ stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]):
StructType = {
+ if (sourceOptions.readChangeFeed) {
Review Comment:
Are "read change feed" and "read from transformWithState" mutually
exclusive? If then, have we blocked the case of specifying both options?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -146,21 +158,40 @@ class StatePartitionReader(
}
override lazy val iter: Iterator[InternalRow] = {
- store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
+ val stateVarName = stateVariableInfoOpt
+ .map(_.stateName).getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
+ store
+ .iterator(stateVarName)
+ .map(pair =>
Review Comment:
nit: `{ pair =>`. Let's use `{` for multi-lines which is more consistent
with existing code.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -74,7 +74,7 @@ class RunningCountStatefulProcessorWithTTL
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
_countState = getHandle.getValueState[Long]("countState",
- Encoders.scalaLong, TTLConfig(Duration.ofMillis(1000)))
+ Encoders.scalaLong, TTLConfig(Duration.ofMillis(30000)))
Review Comment:
It's a huge change of the value of TTL - if the existing test does not fail
with this change, it sounds to me as the existing test is too loose.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -205,26 +207,35 @@ class StateMetadataPartitionReader(
// Need this to be accessible from IncrementalExecution for the planning
rule.
private[sql] def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
- val stateDir = new Path(checkpointLocation, "state")
- val opIds = fileManager
- .list(stateDir, pathNameCanBeParsedAsLongFilter).map(f =>
pathToLong(f.getPath)).sorted
- opIds.map { opId =>
- val operatorIdPath = new Path(stateDir, opId.toString)
- // check if OperatorStateMetadataV2 path exists, if it does, read it
- // otherwise, fall back to OperatorStateMetadataV1
- val operatorStateMetadataV2Path =
OperatorStateMetadataV2.metadataDirPath(operatorIdPath)
- val operatorStateMetadataVersion = if
(fileManager.exists(operatorStateMetadataV2Path)) {
- 2
- } else {
- 1
- }
-
- OperatorStateMetadataReader.createReader(
- operatorIdPath, hadoopConf, operatorStateMetadataVersion,
batchId).read() match {
- case Some(metadata) => metadata
- case None => throw
StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
- batchId)
+ try {
+ val stateDir = new Path(checkpointLocation, "state")
+ val opIds = fileManager
+ .list(stateDir, pathNameCanBeParsedAsLongFilter).map(f =>
pathToLong(f.getPath)).sorted
+ opIds.map { opId =>
+ val operatorIdPath = new Path(stateDir, opId.toString)
+ // check if OperatorStateMetadataV2 path exists, if it does, read it
+ // otherwise, fall back to OperatorStateMetadataV1
+ val operatorStateMetadataV2Path =
OperatorStateMetadataV2.metadataDirPath(operatorIdPath)
+ val operatorStateMetadataVersion = if
(fileManager.exists(operatorStateMetadataV2Path)) {
+ 2
+ } else {
+ 1
+ }
+ OperatorStateMetadataReader.createReader(
+ operatorIdPath, hadoopConf, operatorStateMetadataVersion,
batchId).read() match {
+ case Some(metadata) => metadata
+ case None => throw
StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
+ batchId)
+ }
}
+ } catch {
+ // if the operator metadata is not present, catch the exception
Review Comment:
What's the intention of this change? Does this make any impact on planning
phase of IncrementalExecution?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -30,4 +37,122 @@ object SchemaUtil {
"schema" -> schema.toString()))
}
}
+
+ private def generateSchemaForStateVar(
+ stateVarInfo: TransformWithStateVariableInfo,
+ stateStoreColFamilySchema: StateStoreColFamilySchema): StructType = {
+ val stateVarType = stateVarInfo.stateVariableType
+ val hasTTLEnabled = stateVarInfo.ttlEnabled
+
+ stateVarType match {
+ case StateVariableType.ValueState =>
+ if (hasTTLEnabled) {
+ val ttlValueSchema = SchemaUtil.getSchemaAsDataType(
+ stateStoreColFamilySchema.valueSchema,
"value").asInstanceOf[StructType]
+ new StructType()
+ .add("key", stateStoreColFamilySchema.keySchema)
+ .add("value", ttlValueSchema)
+ .add("expiration_timestamp", LongType)
+ .add("partition_id", IntegerType)
+ } else {
+ new StructType()
+ .add("key", stateStoreColFamilySchema.keySchema)
+ .add("value", stateStoreColFamilySchema.valueSchema)
+ .add("partition_id", IntegerType)
+ }
+
+ case _ =>
+ throw StateDataSourceErrors.internalError(s"Unsupported state variable
type $stateVarType")
+ }
+ }
+
+ def getSourceSchema(
+ sourceOptions: StateSourceOptions,
+ keySchema: StructType,
+ valueSchema: StructType,
+ transformWithStateVariableInfoOpt:
Option[TransformWithStateVariableInfo],
+ stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]):
StructType = {
+ if (sourceOptions.readChangeFeed) {
+ new StructType()
+ .add("batch_id", LongType)
+ .add("change_type", StringType)
+ .add("key", keySchema)
+ .add("value", valueSchema)
+ .add("partition_id", IntegerType)
+ } else if (transformWithStateVariableInfoOpt.isDefined) {
+ require(stateStoreColFamilySchemaOpt.isDefined)
+ generateSchemaForStateVar(transformWithStateVariableInfoOpt.get,
+ stateStoreColFamilySchemaOpt.get)
+ } else {
+ new StructType()
+ .add("key", keySchema)
+ .add("value", valueSchema)
+ .add("partition_id", IntegerType)
+ }
+ }
+
+ def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow), partition: Int):
InternalRow = {
+ val row = new GenericInternalRow(3)
+ row.update(0, pair._1)
+ row.update(1, pair._2)
+ row.update(2, partition)
+ row
+ }
+
+ def unifyStateRowPairWithTTL(
+ pair: (UnsafeRow, UnsafeRow),
+ valueSchema: StructType,
+ partition: Int): InternalRow = {
+ val row = new GenericInternalRow(4)
+ row.update(0, pair._1)
+ row.update(1, pair._2.get(0, valueSchema))
+ row.update(2, pair._2.get(1, LongType))
+ row.update(3, partition)
+ row
+ }
+
+ private val expectedTypes = Map(
Review Comment:
nit: inline or explicitly follow the style guidance of constant.
EXPECTED_COLUMN_TYPES or so.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -268,6 +268,25 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
"message" -> s"value should be less than or equal to $endBatchId"))
}
}
+
+ test("ERROR: trying to specify state variable name with " +
+ s"non-transformWithState operator") {
Review Comment:
nit: no need to have `s` before the string.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]