Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21733#discussion_r208596844
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -201,33 +211,37 @@ object WatermarkSupport {
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateInfo: Option[StatefulOperatorStateInfo],
+ stateFormatVersion: Int,
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
+ private[sql] val stateManager =
StreamingAggregationStateManager.createStateManager(
+ keyExpressions, child.output, stateFormatVersion)
+
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateInfo,
keyExpressions.toStructType,
- child.output.toStructType,
+ stateManager.getStateValueSchema,
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter)
=>
- val getKey = GenerateUnsafeProjection.generate(keyExpressions,
child.output)
val hasInput = iter.hasNext
if (!hasInput && keyExpressions.isEmpty) {
// If our `keyExpressions` are empty, we're getting a global
aggregation. In that case
// the `HashAggregateExec` will output a 0 value for the partial
merge. We need to
// restore the value, so that we don't overwrite our state with
a 0 value, but rather
// merge the 0 with existing state.
+ // In this case the value should represent origin row, so no
need to restore.
--- End diff --
Yes it can be removed now. Will remove.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]