Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156471355 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -783,29 +430,29 @@ class StreamExecution( } while (notDone) { - awaitBatchLock.lock() + awaitProgressLock.lock() try { - awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) + awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { throw streamDeathCause } } finally { - awaitBatchLock.unlock() + awaitProgressLock.unlock() } } logDebug(s"Unblocked at $newOffset for $source") } /** A flag to indicate that a batch has completed with no new data available. */ - @volatile private var noNewData = false + @volatile protected var noNewData = false --- End diff -- Yes. The flag is really just a test harness; it's only used in processAllAvailable, so tests can block until there's a batch (or now epoch) that doesn't contain any data.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org