Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21733#discussion_r208596844
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -201,33 +211,37 @@ object WatermarkSupport {
     case class StateStoreRestoreExec(
         keyExpressions: Seq[Attribute],
         stateInfo: Option[StatefulOperatorStateInfo],
    +    stateFormatVersion: Int,
         child: SparkPlan)
       extends UnaryExecNode with StateStoreReader {
     
    +  private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
    +    keyExpressions, child.output, stateFormatVersion)
    +
       override protected def doExecute(): RDD[InternalRow] = {
         val numOutputRows = longMetric("numOutputRows")
     
         child.execute().mapPartitionsWithStateStore(
           getStateInfo,
           keyExpressions.toStructType,
    -      child.output.toStructType,
    +      stateManager.getStateValueSchema,
           indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) 
=>
    -        val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
             val hasInput = iter.hasNext
             if (!hasInput && keyExpressions.isEmpty) {
               // If our `keyExpressions` are empty, we're getting a global 
aggregation. In that case
               // the `HashAggregateExec` will output a 0 value for the partial 
merge. We need to
               // restore the value, so that we don't overwrite our state with 
a 0 value, but rather
               // merge the 0 with existing state.
    +          // In this case the value should represent origin row, so no 
need to restore.
    --- End diff --
    
    Yes it can be removed now. Will remove.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to