Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21733#discussion_r206727384
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -201,33 +200,37 @@ object WatermarkSupport {
     case class StateStoreRestoreExec(
         keyExpressions: Seq[Attribute],
         stateInfo: Option[StatefulOperatorStateInfo],
    +    stateFormatVersion: Int,
         child: SparkPlan)
       extends UnaryExecNode with StateStoreReader {
     
    +  private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
    +    keyExpressions, child.output, stateFormatVersion)
    +
       override protected def doExecute(): RDD[InternalRow] = {
         val numOutputRows = longMetric("numOutputRows")
     
         child.execute().mapPartitionsWithStateStore(
           getStateInfo,
           keyExpressions.toStructType,
    -      child.output.toStructType,
    +      stateManager.getValueExpressions.toStructType,
    --- End diff --
    
    Seems like you need to only get the schema, not the actual expressions. So 
the StateManager can only return the schema and not the expressions. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to