Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21733#discussion_r206727384
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -201,33 +200,37 @@ object WatermarkSupport {
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateInfo: Option[StatefulOperatorStateInfo],
+ stateFormatVersion: Int,
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
+ private[sql] val stateManager =
StreamingAggregationStateManager.createStateManager(
+ keyExpressions, child.output, stateFormatVersion)
+
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateInfo,
keyExpressions.toStructType,
- child.output.toStructType,
+ stateManager.getValueExpressions.toStructType,
--- End diff --
Seems like you need to only get the schema, not the actual expressions. So
the StateManager can only return the schema and not the expressions.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]