Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185961485
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -384,22 +363,21 @@ class MicroBatchExecution(
               commitLog.purge(currentBatchId - minLogEntriesToMaintain)
             }
           }
    +      noNewData = false
         } else {
    -      awaitProgressLock.lock()
    -      try {
    -        // Wake up any threads that are waiting for the stream to progress.
    -        awaitProgressLockCondition.signalAll()
    -      } finally {
    -        awaitProgressLock.unlock()
    -      }
    +      noNewData = true
    +      awaitProgressLockCondition.signalAll()
         }
    +    shouldConstructNextBatch
       }
     
       /**
        * Processes any data available between `availableOffsets` and 
`committedOffsets`.
        * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this 
batch with.
        */
       private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
    +    logDebug(s"Running batch $currentBatchId")
    +
    --- End diff --
    
    Well a correct source implementation should obviously do that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to