This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 64267beb0 Each job has its own listener event notify thread instead of
the only one curator thread (#2092)
64267beb0 is described below
commit 64267beb079446ec4de101d111997cb85681bd4d
Author: HungChu <[email protected]>
AuthorDate: Mon Aug 1 11:31:03 2022 +0800
Each job has its own listener event notify thread instead of the only one
curator thread (#2092)
* Each job has its own listener notify thread instead of the only one
Curator thread
* Each job has its own listener notify thread instead of the only one
Curator thread
(#2038)
* Use lamda expression instead of anonymous class for CuratorCacheListener
in watch method.
* Add ASF license
* Add unit test case
---
.../reg/base/CoordinatorRegistryCenter.java | 4 +-
.../reg/zookeeper/ZookeeperRegistryCenter.java | 12 +++-
.../ZookeeperRegistryCenterWatchTest.java | 30 +++++++-
.../lite/api/registry/JobInstanceRegistry.java | 9 ++-
.../lite/internal/listener/ListenerManager.java | 1 +
.../internal/listener/ListenerNotifierManager.java | 79 ++++++++++++++++++++++
.../lite/internal/storage/JobNodeStorage.java | 5 +-
.../listener/ListenerNotifierManagerTest.java | 38 +++++++++++
.../lite/internal/storage/JobNodeStorageTest.java | 7 +-
9 files changed, 176 insertions(+), 9 deletions(-)
diff --git
a/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
b/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
index 20aeb281b..eefd43b49 100644
---
a/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
+++
b/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.elasticjob.reg.listener.ConnectionStateChangedE
import
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
import java.util.List;
+import java.util.concurrent.Executor;
/**
* Coordinator registry center.
@@ -111,8 +112,9 @@ public interface CoordinatorRegistryCenter extends
RegistryCenter {
*
* @param key key to be watched
* @param listener data listener
+ * @param executor event notify executor
*/
- void watch(String key, DataChangedEventListener listener);
+ void watch(String key, DataChangedEventListener listener, Executor
executor);
/**
* Add connection state changed event listener to registry center.
diff --git
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
index a414dabea..aa6c03d34 100644
---
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
+++
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
@@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
@@ -408,9 +409,9 @@ public final class ZookeeperRegistryCenter implements
CoordinatorRegistryCenter
}
@Override
- public void watch(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener, final Executor executor) {
CuratorCache cache = caches.get(key + "/");
- cache.listenable().addListener((curatorType, oldData, newData) -> {
+ CuratorCacheListener cacheListener = (curatorType, oldData, newData)
-> {
if (null == newData && null == oldData) {
return;
}
@@ -421,7 +422,12 @@ public final class ZookeeperRegistryCenter implements
CoordinatorRegistryCenter
}
byte[] data = Type.DELETED == type ? oldData.getData() :
newData.getData();
listener.onChange(new DataChangedEvent(type, path, null == data ?
"" : new String(data, StandardCharsets.UTF_8)));
- });
+ };
+ if (executor != null) {
+ cache.listenable().addListener(cacheListener, executor);
+ } else {
+ cache.listenable().addListener(cacheListener);
+ }
}
private Type getTypeFromCuratorType(final CuratorCacheListener.Type
curatorType) {
diff --git
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
index 28e28287b..9f1f3429d 100644
---
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
+++
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
@@ -17,14 +17,20 @@
package org.apache.shardingsphere.elasticjob.reg.zookeeper;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
import
org.apache.shardingsphere.elasticjob.reg.zookeeper.fixture.EmbedTestingServer;
import
org.apache.shardingsphere.elasticjob.reg.zookeeper.util.ZookeeperRegistryCenterTestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.startsWith;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
public final class ZookeeperRegistryCenterWatchTest {
@@ -45,18 +51,38 @@ public final class ZookeeperRegistryCenterWatchTest {
public static void tearDown() {
zkRegCenter.close();
}
+
+ @Test(timeout = 10000L)
+ public void assertWatchWithoutExecutor() throws InterruptedException {
+ CountDownLatch waitingForCountDownValue = new CountDownLatch(1);
+ zkRegCenter.addCacheData("/test");
+ CountDownLatch waitingForWatchReady = new CountDownLatch(1);
+ zkRegCenter.watch("/test", event -> {
+ waitingForWatchReady.countDown();
+ if (DataChangedEvent.Type.UPDATED == event.getType() &&
"countDown".equals(event.getValue())) {
+ waitingForCountDownValue.countDown();
+ }
+ }, null);
+ waitingForWatchReady.await();
+ zkRegCenter.update("/test", "countDown");
+ waitingForCountDownValue.await();
+ }
@Test(timeout = 30000L)
- public void assertWatch() throws InterruptedException {
+ public void assertWatchWithExecutor() throws InterruptedException {
CountDownLatch waitingForCountDownValue = new CountDownLatch(1);
zkRegCenter.addCacheData("/test");
CountDownLatch waitingForWatchReady = new CountDownLatch(1);
+ String threadNamePreffix = "ListenerNotify";
+ ThreadFactory threadFactory =
ThreadUtils.newGenericThreadFactory(threadNamePreffix);
+ Executor executor = Executors.newSingleThreadExecutor(threadFactory);
zkRegCenter.watch("/test", event -> {
+ assertThat(Thread.currentThread().getName(),
startsWith(threadNamePreffix));
waitingForWatchReady.countDown();
if (DataChangedEvent.Type.UPDATED == event.getType() &&
"countDown".equals(event.getValue())) {
waitingForCountDownValue.countDown();
}
- });
+ }, executor);
waitingForWatchReady.await();
zkRegCenter.update("/test", "countDown");
waitingForCountDownValue.await();
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
index 9f7623678..1af396349 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.elasticjob.lite.api.registry;
import lombok.RequiredArgsConstructor;
+
+import org.apache.curator.utils.ThreadUtils;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
@@ -31,6 +33,9 @@ import
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
import
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
import java.util.Arrays;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -51,7 +56,9 @@ public final class JobInstanceRegistry {
* Register.
*/
public void register() {
- regCenter.watch("/", new JobInstanceRegistryListener());
+ ThreadFactory threadFactory =
ThreadUtils.newGenericThreadFactory("ListenerNotify-instanceRegistry");
+ Executor executor = Executors.newSingleThreadExecutor(threadFactory);
+ regCenter.watch("/", new JobInstanceRegistryListener(), executor);
}
public class JobInstanceRegistryListener implements
DataChangedEventListener {
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
index 361b6fd00..987bd8ed8 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
@@ -58,6 +58,7 @@ public final class ListenerManager {
public ListenerManager(final CoordinatorRegistryCenter regCenter, final
String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
+
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
electionListenerManager = new ElectionListenerManager(regCenter,
jobName);
shardingListenerManager = new ShardingListenerManager(regCenter,
jobName);
failoverListenerManager = new FailoverListenerManager(regCenter,
jobName);
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
new file mode 100644
index 000000000..b6dae8654
--- /dev/null
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.listener;
+
+import org.apache.curator.utils.ThreadUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Manage listener's notify executor,
+ * each job has its own listener notify executor.
+ */
+public final class ListenerNotifierManager {
+
+ private static volatile ListenerNotifierManager instance;
+
+ private final Map<String, Executor> listenerNotifyExecutors = new
ConcurrentHashMap<>();
+
+ private ListenerNotifierManager() { }
+
+ /**
+ * Get singleton instance of ListenerNotifierManager.
+ * @return singleton instance of ListenerNotifierManager.
+ */
+ public static ListenerNotifierManager getInstance() {
+ if (null == instance) {
+ synchronized (ListenerNotifierManager.class) {
+ if (null == instance) {
+ instance = new ListenerNotifierManager();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Register a listener notify executor for the job specified.
+ * @param jobName The job's name.
+ */
+ public void registerJobNotifyExecutor(final String jobName) {
+ if (!listenerNotifyExecutors.containsKey(jobName)) {
+ synchronized (this) {
+ if (!listenerNotifyExecutors.containsKey(jobName)) {
+ ThreadFactory threadFactory =
ThreadUtils.newGenericThreadFactory("ListenerNotify-" + jobName);
+ Executor notifyExecutor =
Executors.newSingleThreadExecutor(threadFactory);
+ listenerNotifyExecutors.put(jobName, notifyExecutor);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the listener notify executor for the specified job.
+ * @param jobName The job's name.
+ * @return The job listener's notify executor.
+ */
+ public Executor getJobNotifyExecutor(final String jobName) {
+ return listenerNotifyExecutors.get(jobName);
+ }
+}
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
index ffb30a0e2..1df239043 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.storage;
+import
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerNotifierManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
import
org.apache.shardingsphere.elasticjob.reg.base.transaction.TransactionOperation;
@@ -26,6 +27,7 @@ import
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListene
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executor;
/**
* Job node storage.
@@ -218,7 +220,8 @@ public final class JobNodeStorage {
* @param listener data listener
*/
public void addDataListener(final DataChangedEventListener listener) {
- regCenter.watch("/" + jobName, listener);
+ Executor executor =
ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName);
+ regCenter.watch("/" + jobName, listener, executor);
}
/**
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
new file mode 100644
index 000000000..8e182d8dd
--- /dev/null
+++
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.listener;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ListenerNotifierManagerTest {
+
+ @Test
+ public void assertRegisterAndGetJobNotifyExecutor() {
+ String jobName = "test_job";
+
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
+
assertThat(ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName),
notNullValue(Executor.class));
+ }
+}
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
index 5b6569e66..cb84a7014 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.storage;
+import
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerNotifierManager;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import
org.apache.shardingsphere.elasticjob.reg.base.transaction.TransactionOperation;
@@ -32,6 +33,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executor;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -176,8 +178,11 @@ public final class JobNodeStorageTest {
@Test
public void assertAddDataListener() {
DataChangedEventListener listener =
mock(DataChangedEventListener.class);
+ String jobName = "test_job";
+
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
+ Executor executor =
ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName);
jobNodeStorage.addDataListener(listener);
- verify(regCenter).watch("/test_job", listener);
+ verify(regCenter).watch("/test_job", listener, executor);
}
@Test