Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21220#discussion_r185973062
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -128,40 +130,49 @@ class MicroBatchExecution(
* Repeatedly attempts to run batches as data arrives.
*/
protected def runActivatedStream(sparkSessionForStream: SparkSession):
Unit = {
- triggerExecutor.execute(() => {
- startTrigger()
+ triggerExecutor.execute(() => {
if (isActive) {
+ var currentBatchIsRunnable = false // Whether the current batch is
runnable / has been run
+ var currentBatchHadNewData = false // Whether the current batch
had new data
+
reportTimeTaken("triggerExecution") {
+ startTrigger()
+
+ // We'll do this initialization only once every start / restart
if (currentBatchId < 0) {
- // We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
-
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
- logDebug(s"Stream running from $committedOffsets to
$availableOffsets")
- } else {
- constructNextBatch()
+ logInfo(s"Stream started from $committedOffsets")
}
- if (dataAvailable) {
- currentStatus = currentStatus.copy(isDataAvailable = true)
+
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+
+ // Try to construct the next batch. This will return true only
if the next batch is
+ // ready and runnable. Note that the current batch may be
runnable even without
+ // new data to process as `constructNextBatch` may decide to run
a batch for
+ // state cleanup, etc. `isNewDataAvailable` will be updated to
reflect whether new data
+ // is available or not.
+ currentBatchIsRunnable = constructNextBatch()
+
+ currentStatus = currentStatus.copy(isDataAvailable =
isNewDataAvailable)
+ if (currentBatchIsRunnable) {
updateStatusMessage("Processing new data")
+ // Remember whether the current batch has data or not. This
will be required later
--- End diff --
Fixed it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]