Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19196#discussion_r138760476
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -200,18 +202,31 @@ case class StateStoreRestoreExec(
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter)
=>
val getKey = GenerateUnsafeProjection.generate(keyExpressions,
child.output)
- iter.flatMap { row =>
- val key = getKey(row)
- val savedState = store.get(key)
- numOutputRows += 1
- row +: Option(savedState).toSeq
+ val hasInput = iter.hasNext
+ if (!hasInput && keyExpressions.isEmpty) {
--- End diff --
add docs on why we are doing this. similar to the docs in other places
related to batch aggregation.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]