Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/17525#discussion_r109821142
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
---
@@ -35,6 +42,56 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
assert(processingTimeExecutor.nextBatchTime(150) === 200)
}
+ test("trigger timing") {
+ val executedTimes = new mutable.ArrayBuffer[Long]
+ val manualClock = new StreamManualClock()
+ @volatile var continueExecuting = true
+ @volatile var lastTriggerTime = -1L
+ @volatile var clockIncrementInTrigger = 0L
+ val executor = ProcessingTimeExecutor(ProcessingTime("1000
milliseconds"), manualClock)
+ val executorThread = new Thread() {
+ override def run(): Unit = {
+ executor.execute(() => {
+ // Record the trigger time, increment clock if needed and
+ lastTriggerTime = manualClock.getTimeMillis()
+ manualClock.advance(clockIncrementInTrigger)
+ clockIncrementInTrigger = 0 // reset this so that there are no
runaway triggers
+ continueExecuting
+ })
+ }
+ }
+ executorThread.start()
+ // First batch should execute immediately, then executor should wait
for next one
+ eventually {
+ assert(lastTriggerTime === 0)
+ assert(manualClock.isStreamWaitingAt(0))
+ assert(manualClock.isStreamWaitingFor(1000))
+ }
+
+ // Second batch should execute when clock reaches the next trigger
time.
+ // If next trigger takes less than the trigger interval, executor
should wait for next one
+ clockIncrementInTrigger = 500
+ manualClock.setTime(1000)
+ eventually {
+ assert(lastTriggerTime === 1000)
+ assert(manualClock.isStreamWaitingAt(1500))
+ assert(manualClock.isStreamWaitingFor(2000))
+ }
+
+ // If next trigger takes less than the trigger interval, executor
should immediately execute
+ // another one
+ clockIncrementInTrigger = 1500
+ manualClock.setTime(2000)
+ eventually {
+ assert(lastTriggerTime === 3500)
--- End diff --
it was hard to understand that the test is actually testing that this value
is `3500` instead of `2000`. Could you add a quick comment?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]