This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9aa878ea1f9 [Fix](Job)Fixed job scheduling missing certain time window schedules (#28659) 9aa878ea1f9 is described below commit 9aa878ea1f951693e8b5b4c21b0d52f140bd420b Author: Calvin Kirs <acm_mas...@163.com> AuthorDate: Wed Dec 20 09:21:15 2023 +0800 [Fix](Job)Fixed job scheduling missing certain time window schedules (#28659) Since scheduling itself consumes a certain amount of time, the start time of the time window should not be the current time, but the end time of the last schedule. --- .../org/apache/doris/job/base/JobExecutionConfiguration.java | 4 ++-- .../main/java/org/apache/doris/job/scheduler/JobScheduler.java | 10 ++++++---- .../apache/doris/job/base/JobExecutionConfigurationTest.java | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 16b9dd24281..0b44073464f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -178,8 +178,8 @@ public class JobExecutionConfiguration { // Calculate the trigger time list for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) { - if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs() - || triggerTime < timerDefinition.getEndTimeMs())) { + if (null == timerDefinition.getEndTimeMs() + || triggerTime < timerDefinition.getEndTimeMs()) { timerDefinition.setLatestSchedulerTimeMs(triggerTime); timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 47e91d97b49..08bbbb6dbab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -130,7 +130,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { schedulerInstantJob(job, TaskType.SCHEDULED, null); } //if it's timer job and trigger last window already start, we will scheduler it immediately - cycleTimerJobScheduler(job); + cycleTimerJobScheduler(job, System.currentTimeMillis()); } @Override @@ -139,9 +139,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { } - private void cycleTimerJobScheduler(T job) { + private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), - System.currentTimeMillis(), latestBatchSchedulerTimerTaskTimeMs); + startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isNotEmpty(delaySeconds)) { delaySeconds.forEach(delaySecond -> { TimerJobSchedulerTask<T> timerJobSchedulerTask = new TimerJobSchedulerTask<>(timerJobDisruptor, job); @@ -170,6 +170,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger */ private void executeTimerJobIdsWithinLastTenMinutesWindow() { + + long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); } @@ -186,7 +188,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { continue; } - cycleTimerJobScheduler(job); + cycleTimerJobScheduler(job, lastTimeWindowMs); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 87d1430375a..91678ee5c1d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -64,7 +64,7 @@ public class JobExecutionConfigurationTest { Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( 1001000L, 0L, 1000000L); - Assertions.assertEquals(0, delayTimes.size()); + Assertions.assertEquals(1, delayTimes.size()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org