HeartSaVioR commented on code in PR #47035:
URL: https://github.com/apache/spark/pull/47035#discussion_r1650477997
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -103,10 +130,85 @@ class StateSchemaCompatibilityChecker(
}
}
+ def validateAndMaybeEvolveSchema(
Review Comment:
If we are going to replace all calls from check to this one, please ensure
that check method is removed.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -767,6 +767,35 @@ object SymmetricHashJoinStateManager {
}
}
+ def getSchemaForStateStores(
+ joinSide: JoinSide,
+ inputValueAttributes: Seq[Attribute],
+ joinKeys: Seq[Expression],
+ stateFormatVersion: Int): scala.collection.mutable.Map[String,
(StructType, StructType)] = {
Review Comment:
Is caller expected to modify the result? If not, let's simply return
immutable Map.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -424,6 +430,11 @@ case class StateStoreRestoreExec(
private[sql] val stateManager =
StreamingAggregationStateManager.createStateManager(
keyExpressions, child.output, stateFormatVersion)
+ override def validateAndMaybeEvolveSchema(hadoopConf: Configuration): Unit =
{
Review Comment:
doesn't need this in here
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -70,6 +72,10 @@ trait StatefulOperator extends SparkPlan {
throw new IllegalStateException("State location not present for
execution")
}
}
+
+ // Function used to record state schema for the first time and validate it
against proposed
+ // schema changes in the future. Runs as part of a planning rule on the
driver.
+ def validateAndMaybeEvolveSchema(hadoopConf: Configuration): Unit
Review Comment:
The name is ambiguous, which schema is the target? Operator has multiple
aspect of schema. Let's say `State`Schema.
Also, if you add this in StatefulOperator, streaming aggregation will have
this in both places, restore & save. StateStoreWriter is probably the better
place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]