zsxwing commented on a change in pull request #21220:
URL: https://github.com/apache/spark/pull/21220#discussion_r483707961
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -384,22 +363,21 @@ class MicroBatchExecution(
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
}
+ noNewData = false
} else {
- awaitProgressLock.lock()
- try {
- // Wake up any threads that are waiting for the stream to progress.
- awaitProgressLockCondition.signalAll()
- } finally {
- awaitProgressLock.unlock()
- }
+ noNewData = true
+ awaitProgressLockCondition.signalAll()
}
+ shouldConstructNextBatch
}
/**
* Processes any data available between `availableOffsets` and
`committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch
with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
+ logDebug(s"Running batch $currentBatchId")
+
Review comment:
@brkyvz @tdas We don't call `getBatch` if source and end are the same.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]