This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aa878ea1f9 [Fix](Job)Fixed job scheduling missing certain time window 
schedules (#28659)
9aa878ea1f9 is described below

commit 9aa878ea1f951693e8b5b4c21b0d52f140bd420b
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Wed Dec 20 09:21:15 2023 +0800

    [Fix](Job)Fixed job scheduling missing certain time window schedules 
(#28659)
    
    Since scheduling itself consumes a certain amount of time, the start time 
of the time window should not be the current time, but the end time of the last 
schedule.
---
 .../org/apache/doris/job/base/JobExecutionConfiguration.java   |  4 ++--
 .../main/java/org/apache/doris/job/scheduler/JobScheduler.java | 10 ++++++----
 .../apache/doris/job/base/JobExecutionConfigurationTest.java   |  2 +-
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 16b9dd24281..0b44073464f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -178,8 +178,8 @@ public class JobExecutionConfiguration {
 
         // Calculate the trigger time list
         for (long triggerTime = firstTriggerTime; triggerTime <= 
windowEndTimeMs; triggerTime += intervalMs) {
-            if (triggerTime >= currentTimeMs && (null == 
timerDefinition.getEndTimeMs()
-                    || triggerTime < timerDefinition.getEndTimeMs())) {
+            if (null == timerDefinition.getEndTimeMs()
+                    || triggerTime < timerDefinition.getEndTimeMs()) {
                 timerDefinition.setLatestSchedulerTimeMs(triggerTime);
                 timestamps.add(queryDelayTimeSecond(currentTimeMs, 
triggerTime));
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 47e91d97b49..08bbbb6dbab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -130,7 +130,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
             schedulerInstantJob(job, TaskType.SCHEDULED, null);
         }
         //if it's timer job and trigger last window already start, we will 
scheduler it immediately
-        cycleTimerJobScheduler(job);
+        cycleTimerJobScheduler(job, System.currentTimeMillis());
     }
 
     @Override
@@ -139,9 +139,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
     }
 
 
-    private void cycleTimerJobScheduler(T job) {
+    private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
         List<Long> delaySeconds = 
job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
-                System.currentTimeMillis(), 
latestBatchSchedulerTimerTaskTimeMs);
+                startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
         if (CollectionUtils.isNotEmpty(delaySeconds)) {
             delaySeconds.forEach(delaySecond -> {
                 TimerJobSchedulerTask<T> timerJobSchedulerTask = new 
TimerJobSchedulerTask<>(timerJobDisruptor, job);
@@ -170,6 +170,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
      * We will get the task in the next time window, and then hand it over to 
the time wheel for timing trigger
      */
     private void executeTimerJobIdsWithinLastTenMinutesWindow() {
+
+        long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
         if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
             this.latestBatchSchedulerTimerTaskTimeMs = 
System.currentTimeMillis();
         }
@@ -186,7 +188,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
             if (!job.getJobStatus().equals(JobStatus.RUNNING) && 
!job.getJobConfig().checkIsTimerJob()) {
                 continue;
             }
-            cycleTimerJobScheduler(job);
+            cycleTimerJobScheduler(job, lastTimeWindowMs);
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index 87d1430375a..91678ee5c1d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -64,7 +64,7 @@ public class JobExecutionConfigurationTest {
         Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray());
         delayTimes = configuration.getTriggerDelayTimes(
                 1001000L, 0L, 1000000L);
-        Assertions.assertEquals(0, delayTimes.size());
+        Assertions.assertEquals(1, delayTimes.size());
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to