This is an automated email from the ASF dual-hosted git repository.

linghengqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f2cea628 feat: Support business code capture interruption during exit 
(#2475)
5f2cea628 is described below

commit 5f2cea628c917701325919b03031b17607c26bc8
Author: zhangxunwei <[email protected]>
AuthorDate: Thu Sep 11 13:28:14 2025 +0800

    feat: Support business code capture interruption during exit (#2475)
    
    * feat: Support business code capture interruption during exit
    
    * fix: maintain variables correctly at the end
    
    * fix: fix known issue and code styles
    
    * fix: fix known issue
    
    * fix: fix related issue and test code
    
    * refactor: refactor test code
    
    * Bump Curator to `5.9.0` to fix incompatible changes around Zookeeper 
Client `3.9.3`
    
    * Apply spotless code formatting
    
    ---------
    
    Co-authored-by: Qiheng He <[email protected]>
---
 .../threadpool/ExecutorServiceReloader.java        |  4 +-
 .../kernel/internal/schedule/JobScheduler.java     |  2 +
 .../kernel/internal/schedule/LiteJob.java          | 22 +++++++++--
 .../threadpool/ExecutorServiceReloaderTest.java    |  4 +-
 .../test/natived/it/operation/JavaTest.java        | 44 ++++++++++++++++++++++
 5 files changed, 69 insertions(+), 7 deletions(-)

diff --git 
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
 
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
index 25769242b..4f377e6a4 100644
--- 
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
+++ 
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java
@@ -47,7 +47,7 @@ public final class ExecutorServiceReloader implements 
Closeable {
         if 
(jobExecutorThreadPoolSizeProviderType.equals(jobConfig.getJobExecutorThreadPoolSizeProviderType()))
 {
             return;
         }
-        executorService.shutdown();
+        executorService.shutdownNow();
         init(jobConfig);
     }
     
@@ -59,6 +59,6 @@ public final class ExecutorServiceReloader implements 
Closeable {
     
     @Override
     public void close() {
-        executorService.shutdown();
+        executorService.shutdownNow();
     }
 }
diff --git 
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
 
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
index fbeab942e..859ccd58b 100644
--- 
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
+++ 
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
@@ -178,6 +178,7 @@ public final class JobScheduler {
         result.put("org.quartz.jobStore.misfireThreshold", "1");
         result.put("org.quartz.plugin.shutdownhook.class", 
JobShutdownHookPlugin.class.getName());
         result.put("org.quartz.plugin.shutdownhook.cleanShutdown", 
Boolean.TRUE.toString());
+        result.put("org.quartz.scheduler.interruptJobsOnShutdown", 
Boolean.TRUE.toString());
         return result;
     }
     
@@ -200,6 +201,7 @@ public final class JobScheduler {
     public void shutdown() {
         setUpFacade.tearDown();
         schedulerFacade.shutdownInstance();
+        jobScheduleController.shutdown(false);
         jobExecutor.shutdown();
     }
 }
diff --git 
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
 
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
index fdf57b99e..9b1ec359d 100644
--- 
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
+++ 
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java
@@ -19,20 +19,36 @@ package 
org.apache.shardingsphere.elasticjob.kernel.internal.schedule;
 
 import lombok.Setter;
 import org.apache.shardingsphere.elasticjob.kernel.executor.ElasticJobExecutor;
-import org.quartz.Job;
+import org.quartz.InterruptableJob;
 import org.quartz.JobExecutionContext;
+import org.quartz.UnableToInterruptJobException;
+
+import java.util.Objects;
 
 /**
  * Lite job.
  */
 @Setter
-public final class LiteJob implements Job {
+public final class LiteJob implements InterruptableJob {
     
     private ElasticJobExecutor jobExecutor;
     
+    private volatile Thread currentThread;
+    
     @Override
     public void execute(final JobExecutionContext context) {
-        jobExecutor.execute();
+        try {
+            currentThread = Thread.currentThread();
+            jobExecutor.execute();
+        } finally {
+            currentThread = null;
+        }
     }
     
+    @Override
+    public void interrupt() throws UnableToInterruptJobException {
+        if (Objects.nonNull(currentThread)) {
+            currentThread.interrupt();
+        }
+    }
 }
diff --git 
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
 
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
index 55deeead5..1ec2a3ebb 100644
--- 
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
+++ 
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java
@@ -57,7 +57,7 @@ class ExecutorServiceReloaderTest {
         ReflectionUtils.setFieldValue(executorServiceReloader, 
"executorService", mockExecutorService);
         JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 
1).build();
         executorServiceReloader.reloadIfNecessary(jobConfig);
-        verify(mockExecutorService).shutdown();
+        verify(mockExecutorService).shutdownNow();
         ExecutorService actual = executorServiceReloader.getExecutorService();
         assertFalse(actual.isShutdown());
         assertFalse(actual.isTerminated());
@@ -81,6 +81,6 @@ class ExecutorServiceReloaderTest {
         ExecutorServiceReloader executorServiceReloader = new 
ExecutorServiceReloader(JobConfiguration.newBuilder("job", 
1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build());
         ReflectionUtils.setFieldValue(executorServiceReloader, 
"executorService", mockExecutorService);
         executorServiceReloader.close();
-        verify(mockExecutorService).shutdown();
+        verify(mockExecutorService).shutdownNow();
     }
 }
diff --git 
a/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
 
b/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
index 939866716..7fb5ac529 100644
--- 
a/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
+++ 
b/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java
@@ -44,6 +44,7 @@ import 
org.apache.shardingsphere.elasticjob.lifecycle.internal.statistics.Shardi
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
 import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import 
org.apache.shardingsphere.elasticjob.test.natived.commons.job.simple.JavaSimpleJob;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
@@ -54,11 +55,13 @@ import org.junit.jupiter.api.condition.EnabledInNativeImage;
 import javax.sql.DataSource;
 import java.io.IOException;
 import java.time.Duration;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -305,4 +308,45 @@ class JavaTest {
         });
         job.shutdown();
     }
+    
+    @Test
+    void testWhenShutdownThenTaskCanCaptureInterruptedException() throws 
Exception {
+        testCaptureInterruptedException(1);
+        testCaptureInterruptedException(2);
+    }
+    
+    private void testCaptureInterruptedException(final int shardingTotalCount) 
throws Exception {
+        String jobName = "testTaskCaptureInterruptedTask" + shardingTotalCount;
+        AtomicBoolean captured = new AtomicBoolean(false);
+        AtomicBoolean running = new AtomicBoolean(false);
+        LocalTime oneSecondsLater = LocalTime.now().plusSeconds(1);
+        String cronExpression = String.format("%d %d %d * * ?", 
oneSecondsLater.getSecond(), oneSecondsLater.getMinute(), 
oneSecondsLater.getHour());
+        SimpleJob captureInterruptedTask = shardingContext -> {
+            try {
+                running.set(true);
+                
+                while (true) {
+                    if (Thread.currentThread().isInterrupted()) {
+                        captured.set(true);
+                        running.set(false);
+                        break;
+                    }
+                    System.out.println("Running...");
+                    Thread.sleep(100);
+                }
+            } catch (final InterruptedException e) {
+                captured.set(true);
+                running.set(false);
+                Thread.currentThread().interrupt();
+            }
+        };
+        ScheduleJobBootstrap job = new ScheduleJobBootstrap(firstRegCenter, 
captureInterruptedTask,
+                JobConfiguration.newBuilder(jobName, shardingTotalCount)
+                        .cron(cronExpression)
+                        .build());
+        job.schedule();
+        Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(100L, 
TimeUnit.MILLISECONDS).until(running::get);
+        job.shutdown();
+        Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(100L, 
TimeUnit.MILLISECONDS).untilAsserted(() -> assertThat(captured.get(), 
is(true)));
+    }
 }

Reply via email to