This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new ec1138b94 Fix issue #2116 : add executor remove where job shutdown 
(#2117)
ec1138b94 is described below

commit ec1138b9467cad5ef69bcce7a55b4de63eb6c357
Author: sxl <[email protected]>
AuthorDate: Thu Aug 25 09:20:23 2022 +0800

    Fix issue #2116 : add executor remove where job shutdown (#2117)
    
    Co-authored-by: songxiulu <[email protected]>
---
 .../lite/internal/listener/ListenerNotifierManager.java    | 14 ++++++++++++--
 .../elasticjob/lite/internal/schedule/JobRegistry.java     |  2 ++
 .../internal/listener/ListenerNotifierManagerTest.java     |  7 +++++++
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
index b6dae8654..6f32a4251 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
@@ -20,8 +20,10 @@ package 
org.apache.shardingsphere.elasticjob.lite.internal.listener;
 import org.apache.curator.utils.ThreadUtils;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
@@ -33,7 +35,7 @@ public final class ListenerNotifierManager {
 
     private static volatile ListenerNotifierManager instance;
 
-    private final Map<String, Executor> listenerNotifyExecutors = new 
ConcurrentHashMap<>();
+    private final Map<String, ExecutorService> listenerNotifyExecutors = new 
ConcurrentHashMap<>();
 
     private ListenerNotifierManager() { }
 
@@ -61,7 +63,7 @@ public final class ListenerNotifierManager {
             synchronized (this) {
                 if (!listenerNotifyExecutors.containsKey(jobName)) {
                     ThreadFactory threadFactory = 
ThreadUtils.newGenericThreadFactory("ListenerNotify-" + jobName);
-                    Executor notifyExecutor = 
Executors.newSingleThreadExecutor(threadFactory);
+                    ExecutorService notifyExecutor = 
Executors.newSingleThreadExecutor(threadFactory);
                     listenerNotifyExecutors.put(jobName, notifyExecutor);
                 }
             }
@@ -76,4 +78,12 @@ public final class ListenerNotifierManager {
     public Executor getJobNotifyExecutor(final String jobName) {
         return listenerNotifyExecutors.get(jobName);
     }
+
+    /**
+     * Remove and shutdown the listener notify executor from 
listenerNotifyExecutors.
+     * @param jobName The job's name.
+     */
+    public void removeJobNotifyExecutor(final String jobName) {
+        
Optional.ofNullable(listenerNotifyExecutors.remove(jobName)).ifPresent(ExecutorService::shutdown);
+    }
 }
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
index 07d033dc4..24f800bcb 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.elasticjob.lite.internal.schedule;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import 
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerNotifierManager;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 
 import java.util.Map;
@@ -169,6 +170,7 @@ public final class JobRegistry {
     public void shutdown(final String jobName) {
         
Optional.ofNullable(schedulerMap.remove(jobName)).ifPresent(JobScheduleController::shutdown);
         Optional.ofNullable(regCenterMap.remove(jobName)).ifPresent(regCenter 
-> regCenter.evictCacheData("/" + jobName));
+        ListenerNotifierManager.getInstance().removeJobNotifyExecutor(jobName);
         jobInstanceMap.remove(jobName);
         jobRunningMap.remove(jobName);
         currentShardingTotalCountMap.remove(jobName);
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
index 8e182d8dd..cdba0cc21 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
@@ -35,4 +35,11 @@ public class ListenerNotifierManagerTest {
         
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
         
assertThat(ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName), 
notNullValue(Executor.class));
     }
+
+    @Test
+    public void assertRemoveAndShutDownJobNotifyExecutor() {
+        String jobName = "test_job";
+        
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
+        ListenerNotifierManager.getInstance().removeJobNotifyExecutor(jobName);
+    }
 }

Reply via email to