Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/19196#discussion_r138988133
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -200,18 +202,31 @@ case class StateStoreRestoreExec(
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter)
=>
val getKey = GenerateUnsafeProjection.generate(keyExpressions,
child.output)
- iter.flatMap { row =>
- val key = getKey(row)
- val savedState = store.get(key)
- numOutputRows += 1
- row +: Option(savedState).toSeq
+ val hasInput = iter.hasNext
+ if (!hasInput && keyExpressions.isEmpty) {
--- End diff --
there wasn't any docs in batch :)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]