Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21220#discussion_r185961485
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -384,22 +363,21 @@ class MicroBatchExecution(
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
}
+ noNewData = false
} else {
- awaitProgressLock.lock()
- try {
- // Wake up any threads that are waiting for the stream to progress.
- awaitProgressLockCondition.signalAll()
- } finally {
- awaitProgressLock.unlock()
- }
+ noNewData = true
+ awaitProgressLockCondition.signalAll()
}
+ shouldConstructNextBatch
}
/**
* Processes any data available between `availableOffsets` and
`committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this
batch with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
+ logDebug(s"Running batch $currentBatchId")
+
--- End diff --
Well a correct source implementation should obviously do that.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]