This is an automated email from the ASF dual-hosted git repository.
linghengqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 5f2cea628 feat: Support business code capture interruption during exit
(#2475)
5f2cea628 is described below
commit 5f2cea628c917701325919b03031b17607c26bc8
Author: zhangxunwei <[email protected]>
AuthorDate: Thu Sep 11 13:28:14 2025 +0800
feat: Support business code capture interruption during exit (#2475)
* feat: Support business code capture interruption during exit
* fix: maintain variables correctly at the end
* fix: fix known issue and code styles
* fix: fix known issue
* fix: fix related issue and test code
* refactor: refactor test code
* Bump Curator to `5.9.0` to fix incompatible changes around Zookeeper
Client `3.9.3`
* Apply spotless code formatting
---------
Co-authored-by: Qiheng He <[email protected]>
---
.../threadpool/ExecutorServiceReloader.java | 4 +-
.../kernel/internal/schedule/JobScheduler.java | 2 +
.../kernel/internal/schedule/LiteJob.java | 22 +++++++++--
.../threadpool/ExecutorServiceReloaderTest.java | 4 +-
.../test/natived/it/operation/JavaTest.java | 44 ++++++++++++++++++++++
5 files changed, 69 insertions(+), 7 deletions(-)
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
index 25769242b..4f377e6a4 100644
---
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
@@ -47,7 +47,7 @@ public final class ExecutorServiceReloader implements
Closeable {
if
(jobExecutorThreadPoolSizeProviderType.equals(jobConfig.getJobExecutorThreadPoolSizeProviderType()))
{
return;
}
- executorService.shutdown();
+ executorService.shutdownNow();
init(jobConfig);
}
@@ -59,6 +59,6 @@ public final class ExecutorServiceReloader implements
Closeable {
@Override
public void close() {
- executorService.shutdown();
+ executorService.shutdownNow();
}
}
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
index fbeab942e..859ccd58b 100644
---
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
@@ -178,6 +178,7 @@ public final class JobScheduler {
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class",
JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown",
Boolean.TRUE.toString());
+ result.put("org.quartz.scheduler.interruptJobsOnShutdown",
Boolean.TRUE.toString());
return result;
}
@@ -200,6 +201,7 @@ public final class JobScheduler {
public void shutdown() {
setUpFacade.tearDown();
schedulerFacade.shutdownInstance();
+ jobScheduleController.shutdown(false);
jobExecutor.shutdown();
}
}
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
index fdf57b99e..9b1ec359d 100644
---
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
@@ -19,20 +19,36 @@ package
org.apache.shardingsphere.elasticjob.kernel.internal.schedule;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.kernel.executor.ElasticJobExecutor;
-import org.quartz.Job;
+import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
+import org.quartz.UnableToInterruptJobException;
+
+import java.util.Objects;
/**
* Lite job.
*/
@Setter
-public final class LiteJob implements Job {
+public final class LiteJob implements InterruptableJob {
private ElasticJobExecutor jobExecutor;
+ private volatile Thread currentThread;
+
@Override
public void execute(final JobExecutionContext context) {
- jobExecutor.execute();
+ try {
+ currentThread = Thread.currentThread();
+ jobExecutor.execute();
+ } finally {
+ currentThread = null;
+ }
}
+ @Override
+ public void interrupt() throws UnableToInterruptJobException {
+ if (Objects.nonNull(currentThread)) {
+ currentThread.interrupt();
+ }
+ }
}
diff --git
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
index 55deeead5..1ec2a3ebb 100644
---
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
+++
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
@@ -57,7 +57,7 @@ class ExecutorServiceReloaderTest {
ReflectionUtils.setFieldValue(executorServiceReloader,
"executorService", mockExecutorService);
JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).build();
executorServiceReloader.reloadIfNecessary(jobConfig);
- verify(mockExecutorService).shutdown();
+ verify(mockExecutorService).shutdownNow();
ExecutorService actual = executorServiceReloader.getExecutorService();
assertFalse(actual.isShutdown());
assertFalse(actual.isTerminated());
@@ -81,6 +81,6 @@ class ExecutorServiceReloaderTest {
ExecutorServiceReloader executorServiceReloader = new
ExecutorServiceReloader(JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build());
ReflectionUtils.setFieldValue(executorServiceReloader,
"executorService", mockExecutorService);
executorServiceReloader.close();
- verify(mockExecutorService).shutdown();
+ verify(mockExecutorService).shutdownNow();
}
}
diff --git
a/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
b/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
index 939866716..7fb5ac529 100644
---
a/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
+++
b/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
@@ -44,6 +44,7 @@ import
org.apache.shardingsphere.elasticjob.lifecycle.internal.statistics.Shardi
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import
org.apache.shardingsphere.elasticjob.test.natived.commons.job.simple.JavaSimpleJob;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
@@ -54,11 +55,13 @@ import org.junit.jupiter.api.condition.EnabledInNativeImage;
import javax.sql.DataSource;
import java.io.IOException;
import java.time.Duration;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -305,4 +308,45 @@ class JavaTest {
});
job.shutdown();
}
+
+ @Test
+ void testWhenShutdownThenTaskCanCaptureInterruptedException() throws
Exception {
+ testCaptureInterruptedException(1);
+ testCaptureInterruptedException(2);
+ }
+
+ private void testCaptureInterruptedException(final int shardingTotalCount)
throws Exception {
+ String jobName = "testTaskCaptureInterruptedTask" + shardingTotalCount;
+ AtomicBoolean captured = new AtomicBoolean(false);
+ AtomicBoolean running = new AtomicBoolean(false);
+ LocalTime oneSecondsLater = LocalTime.now().plusSeconds(1);
+ String cronExpression = String.format("%d %d %d * * ?",
oneSecondsLater.getSecond(), oneSecondsLater.getMinute(),
oneSecondsLater.getHour());
+ SimpleJob captureInterruptedTask = shardingContext -> {
+ try {
+ running.set(true);
+
+ while (true) {
+ if (Thread.currentThread().isInterrupted()) {
+ captured.set(true);
+ running.set(false);
+ break;
+ }
+ System.out.println("Running...");
+ Thread.sleep(100);
+ }
+ } catch (final InterruptedException e) {
+ captured.set(true);
+ running.set(false);
+ Thread.currentThread().interrupt();
+ }
+ };
+ ScheduleJobBootstrap job = new ScheduleJobBootstrap(firstRegCenter,
captureInterruptedTask,
+ JobConfiguration.newBuilder(jobName, shardingTotalCount)
+ .cron(cronExpression)
+ .build());
+ job.schedule();
+ Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(100L,
TimeUnit.MILLISECONDS).until(running::get);
+ job.shutdown();
+ Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(100L,
TimeUnit.MILLISECONDS).untilAsserted(() -> assertThat(captured.get(),
is(true)));
+ }
}