Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21733#discussion_r206784385
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -201,33 +200,37 @@ object WatermarkSupport {
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateInfo: Option[StatefulOperatorStateInfo],
+ stateFormatVersion: Int,
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
+ private[sql] val stateManager =
StreamingAggregationStateManager.createStateManager(
+ keyExpressions, child.output, stateFormatVersion)
+
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateInfo,
keyExpressions.toStructType,
- child.output.toStructType,
+ stateManager.getValueExpressions.toStructType,
--- End diff --
Right. Sounds like `StructType` is preferred than `Seq[Attribute]` in this
case. Will apply.
Maybe dumb question from newbie on Spark SQL (still trying to get familiar
with) : I guess we prefer StructType in this case cause it's less restrictive
and also get rid of headache of dealing with fields reference. Do I understand
correctly?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]