Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19196#discussion_r138988133
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -200,18 +202,31 @@ case class StateStoreRestoreExec(
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) 
=>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
    -        iter.flatMap { row =>
    -          val key = getKey(row)
    -          val savedState = store.get(key)
    -          numOutputRows += 1
    -          row +: Option(savedState).toSeq
    +        val hasInput = iter.hasNext
    +        if (!hasInput && keyExpressions.isEmpty) {
    --- End diff --
    
    there wasn't any docs in batch :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to