This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new ec1138b94 Fix issue #2116 : add executor remove where job shutdown
(#2117)
ec1138b94 is described below
commit ec1138b9467cad5ef69bcce7a55b4de63eb6c357
Author: sxl <[email protected]>
AuthorDate: Thu Aug 25 09:20:23 2022 +0800
Fix issue #2116 : add executor remove where job shutdown (#2117)
Co-authored-by: songxiulu <[email protected]>
---
.../lite/internal/listener/ListenerNotifierManager.java | 14 ++++++++++++--
.../elasticjob/lite/internal/schedule/JobRegistry.java | 2 ++
.../internal/listener/ListenerNotifierManagerTest.java | 7 +++++++
3 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
index b6dae8654..6f32a4251 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
@@ -20,8 +20,10 @@ package
org.apache.shardingsphere.elasticjob.lite.internal.listener;
import org.apache.curator.utils.ThreadUtils;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -33,7 +35,7 @@ public final class ListenerNotifierManager {
private static volatile ListenerNotifierManager instance;
- private final Map<String, Executor> listenerNotifyExecutors = new
ConcurrentHashMap<>();
+ private final Map<String, ExecutorService> listenerNotifyExecutors = new
ConcurrentHashMap<>();
private ListenerNotifierManager() { }
@@ -61,7 +63,7 @@ public final class ListenerNotifierManager {
synchronized (this) {
if (!listenerNotifyExecutors.containsKey(jobName)) {
ThreadFactory threadFactory =
ThreadUtils.newGenericThreadFactory("ListenerNotify-" + jobName);
- Executor notifyExecutor =
Executors.newSingleThreadExecutor(threadFactory);
+ ExecutorService notifyExecutor =
Executors.newSingleThreadExecutor(threadFactory);
listenerNotifyExecutors.put(jobName, notifyExecutor);
}
}
@@ -76,4 +78,12 @@ public final class ListenerNotifierManager {
public Executor getJobNotifyExecutor(final String jobName) {
return listenerNotifyExecutors.get(jobName);
}
+
+ /**
+ * Remove and shutdown the listener notify executor from
listenerNotifyExecutors.
+ * @param jobName The job's name.
+ */
+ public void removeJobNotifyExecutor(final String jobName) {
+
Optional.ofNullable(listenerNotifyExecutors.remove(jobName)).ifPresent(ExecutorService::shutdown);
+ }
}
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
index 07d033dc4..24f800bcb 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobRegistry.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.elasticjob.lite.internal.schedule;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerNotifierManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import java.util.Map;
@@ -169,6 +170,7 @@ public final class JobRegistry {
public void shutdown(final String jobName) {
Optional.ofNullable(schedulerMap.remove(jobName)).ifPresent(JobScheduleController::shutdown);
Optional.ofNullable(regCenterMap.remove(jobName)).ifPresent(regCenter
-> regCenter.evictCacheData("/" + jobName));
+ ListenerNotifierManager.getInstance().removeJobNotifyExecutor(jobName);
jobInstanceMap.remove(jobName);
jobRunningMap.remove(jobName);
currentShardingTotalCountMap.remove(jobName);
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
index 8e182d8dd..cdba0cc21 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
@@ -35,4 +35,11 @@ public class ListenerNotifierManagerTest {
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
assertThat(ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName),
notNullValue(Executor.class));
}
+
+ @Test
+ public void assertRemoveAndShutDownJobNotifyExecutor() {
+ String jobName = "test_job";
+
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
+ ListenerNotifierManager.getInstance().removeJobNotifyExecutor(jobName);
+ }
}