Github user CodingCat commented on a diff in the pull request:
https://github.com/apache/spark/pull/19926#discussion_r156457754
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -783,29 +430,29 @@ class StreamExecution(
}
while (notDone) {
- awaitBatchLock.lock()
+ awaitProgressLock.lock()
try {
- awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+ awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
throw streamDeathCause
}
} finally {
- awaitBatchLock.unlock()
+ awaitProgressLock.unlock()
}
}
logDebug(s"Unblocked at $newOffset for $source")
}
/** A flag to indicate that a batch has completed with no new data
available. */
- @volatile private var noNewData = false
+ @volatile protected var noNewData = false
--- End diff --
will the noNewData flag still be useful for continuous processing?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]