This is an automated email from the ASF dual-hosted git repository.
jimin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new d31840961a refactor: optimize Configuration Cache (#6420)
d31840961a is described below
commit d31840961a358af82abc872ecd2a8ef402d23351
Author: jimin <[email protected]>
AuthorDate: Thu Mar 28 10:11:11 2024 +0800
refactor: optimize Configuration Cache (#6420)
---
changes/en-us/2.x.md | 2 +
changes/zh-cn/2.x.md | 3 +-
.../seata/config/consul/ConsulConfiguration.java | 6 +--
.../config/CachedConfigurationChangeListener.java | 28 ++++++++++++
.../apache/seata/config/ConfigurationCache.java | 51 ++++------------------
.../seata/config/ConfigurationChangeListener.java | 8 ++--
.../org/apache/seata/config/FileConfiguration.java | 2 +-
.../seata/config/ConfigurationCacheTests.java | 40 +----------------
.../apache/seata/config/FileConfigurationTest.java | 29 +++++++-----
.../seata/config/ProConfigurationFactoryTest.java | 2 +-
.../config/RegistryConfigurationFactoryTest.java | 1 -
.../seata/config/YamlConfigurationFactoryTest.java | 2 +-
.../src/test/resources/file-test-pro.conf | 3 ++
.../src/test/resources/file-test-yaml.conf | 4 ++
.../seata/config/CustomConfigurationForTest.java | 1 -
.../seata/config/etcd3/EtcdConfiguration.java | 2 +-
.../seata/core/rpc/netty/NettyClientConfig.java | 2 +-
.../seata/core/rpc/netty/NettyRemotingServer.java | 4 ++
.../seata/core/rpc/netty/NettyServerBootstrap.java | 4 ++
.../seata/core/rpc/netty/NettyServerConfig.java | 10 +++--
.../core/rpc/netty/RmNettyRemotingClient.java | 25 ++++++-----
.../core/rpc/netty/TmNettyRemotingClient.java | 12 ++---
.../seata/core/rpc/netty/NettyClientTestSuite.java | 25 +++++++++++
.../seata/core/rpc/netty/RmNettyClientTest.java | 34 +++++++++------
.../seata/core/rpc/netty/TmNettyClientTest.java | 50 ++++++++++++---------
.../seata/discovery/registry/RegistryService.java | 11 +++--
.../GlobalTransactionalInterceptorHandler.java | 20 ++++-----
.../GlobalTransactionalInterceptorParser.java | 7 +--
pom.xml | 10 +++++
.../rm/datasource/exec/LockRetryController.java | 10 ++---
.../storage/db/lock/DataBaseDistributedLocker.java | 7 ++-
.../annotation/GlobalTransactionScanner.java | 17 ++++----
.../seata/common/ConfigurationTestHelper.java | 17 +++++---
.../seata/saga/engine/db/AbstractServerTest.java | 6 ++-
34 files changed, 252 insertions(+), 203 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 91632d04d8..be4b6ce807 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -113,6 +113,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6387](https://github.com/apache/incubator-seata/pull/6387)] optimize tcc
use compatible
- [[#6402](https://github.com/apache/incubator-seata/pull/6402)] optimize
rm-datasource use compatible
- [[#6419](https://github.com/apache/incubator-seata/pull/6419)] optimize
integration-tx-api compatible
+- [[#6405](https://github.com/apache/incubator-seata/pull/6405)] fix kotlin
compile failure
- [[#6412](https://github.com/apache/incubator-seata/pull/6412)] optimize core
compatible module
- [[#6429](https://github.com/apache/incubator-seata/pull/6429)] remove
repetitive words
@@ -140,6 +141,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6280](https://github.com/apache/incubator-seata/pull/6280)] refactor Saga
designer using diagram-js
- [[#6269](https://github.com/apache/incubator-seata/pull/6269)] standardize
Seata Exception
+- [[#6420](https://github.com/apache/incubator-seata/pull/6420)] refactor
Configuration Cache
Thanks to these contributors for their code commits. Please report an
unintended omission.
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index a32b723df5..61229c5ce6 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -115,10 +115,10 @@
- [[#6387](https://github.com/apache/incubator-seata/pull/6387)] 优化tcc使用兼容
- [[#6402](https://github.com/apache/incubator-seata/pull/6402)]
优化rm-datasource向下兼容
- [[#6419](https://github.com/apache/incubator-seata/pull/6419)]
优化integration-tx-api向下兼容
+- [[#6405](https://github.com/apache/incubator-seata/pull/6405)] 修复 kotlin 编译失败
- [[#6412](https://github.com/apache/incubator-seata/pull/6412)] 优化 core 兼容模块
- [[#6429](https://github.com/apache/incubator-seata/pull/6429)] 移除重复注释
-
### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)]
升级Guava依赖版本,修复安全漏洞
- [[#6144](https://github.com/apache/incubator-seata/pull/6144)]
升级Nacos依赖版本至1.4.6
@@ -141,6 +141,7 @@
- [[#6280](https://github.com/apache/incubator-seata/pull/6280)]
使用diagram-js重构Saga设计器
- [[#6269](https://github.com/apache/incubator-seata/pull/6269)] 统一Seata异常规范
+- [[#6420](https://github.com/apache/incubator-seata/pull/6420)] 优化配置缓存
非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。
diff --git
a/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java
b/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java
index 31fe5acfda..1a81de7d67 100644
---
a/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java
+++
b/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java
@@ -20,9 +20,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Enumeration;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -346,7 +346,7 @@ public class ConsulConfiguration extends
AbstractConfiguration {
for (ConfigurationChangeListener
changeListener : entry.getValue()) {
event.setDataId(key).setNewValue(valueNew);
ConfigurationChangeListener listener =
((ConsulListener) changeListener).getTargetListener();
- listener.onChangeEvent(event);
+ listener.onProcessEvent(event);
}
}
}
@@ -354,7 +354,7 @@ public class ConsulConfiguration extends
AbstractConfiguration {
} else {
// The old config change listener,it would be deleted
in next edition
event.setDataId(dataId).setNewValue(value);
- listener.onChangeEvent(event);
+ listener.onProcessEvent(event);
}
}
}
diff --git
a/config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java
b/config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java
new file mode 100644
index 0000000000..074eadc586
--- /dev/null
+++
b/config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.config;
+
+public interface CachedConfigurationChangeListener extends
ConfigurationChangeListener {
+
+ ConfigurationCache CONFIGURATION_CACHE = ConfigurationCache.getInstance();
+
+ @Override
+ default void afterEvent(ConfigurationChangeEvent event) {
+ ConfigurationChangeListener listener =
(ConfigurationChangeListener)CONFIGURATION_CACHE;
+ listener.onProcessEvent(event);
+ }
+}
diff --git
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java
index 889e59bc74..6bc8573c20 100644
---
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java
+++
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java
@@ -21,8 +21,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.seata.common.util.CollectionUtils;
+
import org.apache.seata.common.util.DurationUtil;
import org.apache.seata.common.util.StringUtils;
@@ -37,45 +38,7 @@ public class ConfigurationCache implements
ConfigurationChangeListener {
private static final Map<String, ObjectWrapper> CONFIG_CACHE = new
ConcurrentHashMap<>();
- private Map<String, HashSet<ConfigurationChangeListener>>
configListenersMap = new HashMap<>();
-
- public static void addConfigListener(String dataId,
ConfigurationChangeListener... listeners) {
- if (StringUtils.isBlank(dataId)) {
- return;
- }
- synchronized (ConfigurationCache.class) {
- HashSet<ConfigurationChangeListener> listenerHashSet =
- getInstance().configListenersMap.computeIfAbsent(dataId, key
-> new HashSet<>());
- if (!listenerHashSet.contains(getInstance())) {
- ConfigurationFactory.getInstance().addConfigListener(dataId,
getInstance());
- listenerHashSet.add(getInstance());
- }
- if (null != listeners && listeners.length > 0) {
- for (ConfigurationChangeListener listener : listeners) {
- if (!listenerHashSet.contains(listener)) {
- listenerHashSet.add(listener);
-
ConfigurationFactory.getInstance().addConfigListener(dataId, listener);
- }
- }
- }
- }
- }
-
- public static void removeConfigListener(String dataId,
ConfigurationChangeListener... listeners) {
- if (StringUtils.isBlank(dataId)) {
- return;
- }
- synchronized (ConfigurationCache.class) {
- final HashSet<ConfigurationChangeListener> listenerSet =
getInstance().configListenersMap.get(dataId);
- if (CollectionUtils.isNotEmpty(listenerSet)) {
- for (ConfigurationChangeListener listener : listeners) {
- if (listenerSet.remove(listener)) {
-
ConfigurationFactory.getInstance().removeConfigListener(dataId, listener);
- }
- }
- }
- }
- }
+ private static final Set<String> DATA_ID_CACHED = new HashSet<>();
public static ConfigurationCache getInstance() {
return ConfigurationCacheInstance.INSTANCE;
@@ -91,11 +54,12 @@ public class ConfigurationCache implements
ConfigurationChangeListener {
} else {
Object newValue = new ObjectWrapper(event.getNewValue(),
null).convertData(oldWrapper.getType());
if (!Objects.equals(oldWrapper.getData(), newValue)) {
- CONFIG_CACHE.put(event.getDataId(), new
ObjectWrapper(newValue, oldWrapper.getType(),oldWrapper.getLastDefaultValue()));
+ CONFIG_CACHE.put(event.getDataId(),
+ new ObjectWrapper(newValue, oldWrapper.getType(),
oldWrapper.getLastDefaultValue()));
}
}
} else {
- CONFIG_CACHE.remove(event.getDataId());
+ CONFIG_CACHE.remove(event.getDataId(), oldWrapper);
}
}
@@ -115,6 +79,9 @@ public class ConfigurationCache implements
ConfigurationChangeListener {
}
if (null == wrapper
|| (null != defaultValue &&
!Objects.equals(defaultValue, wrapper.lastDefaultValue))) {
+ if (DATA_ID_CACHED.add(rawDataId)) {
+ originalConfiguration.addConfigListener(rawDataId,
this);
+ }
Object result = method.invoke(originalConfiguration,
args);
// The wrapper.data only exists in the cache when it
is not null.
if (result != null) {
diff --git
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java
index 338f0a06e1..3c69735e9c 100644
---
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java
+++
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java
@@ -58,9 +58,9 @@ public interface ConfigurationChangeListener {
*/
default void onProcessEvent(ConfigurationChangeEvent event) {
getExecutorService().submit(() -> {
- beforeEvent();
+ beforeEvent(event);
onChangeEvent(event);
- afterEvent();
+ afterEvent(event);
});
}
@@ -83,14 +83,14 @@ public interface ConfigurationChangeListener {
/**
* Before event.
*/
- default void beforeEvent() {
+ default void beforeEvent(ConfigurationChangeEvent event) {
}
/**
* After event.
*/
- default void afterEvent() {
+ default void afterEvent(ConfigurationChangeEvent event) {
}
}
diff --git
a/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java
b/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java
index f0feed1435..08e8be181f 100644
---
a/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java
+++
b/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java
@@ -394,7 +394,7 @@ public class FileConfiguration extends
AbstractConfiguration {
event.setDataId(dataId).setNewValue(currentConfig).setOldValue(oldConfig);
for (ConfigurationChangeListener listener :
dataIdMap.get(dataId)) {
- listener.onChangeEvent(event);
+ listener.onProcessEvent(event);
}
}
}
diff --git
a/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java
b/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java
index c26ab4eb3b..010753092c 100644
---
a/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java
+++
b/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java
@@ -16,17 +16,12 @@
*/
package org.apache.seata.config;
-import org.apache.seata.common.util.CollectionUtils;
+import java.time.Duration;
+
import org.apache.seata.common.util.DurationUtil;
-import org.apache.seata.common.util.ReflectionUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.lang.reflect.Field;
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.Map;
-
public class ConfigurationCacheTests {
@@ -70,37 +65,6 @@ public class ConfigurationCacheTests {
Assertions.assertNull(test);
}
- // FIXME: 2023/2/19 wait bugfix
- // @Test
- public void testConfigListener() throws Exception {
- Configuration configuration = new FileConfiguration("registry");
- configuration = ConfigurationCache.getInstance().proxy(configuration);
-
- // get config listeners map
- Field configListenersMapField =
ReflectionUtil.getField(ConfigurationCache.class, "configListenersMap");
- Map<String, HashSet<ConfigurationChangeListener>> configListenersMap =
(Map<String,
-
HashSet<ConfigurationChangeListener>>)configListenersMapField.get(ConfigurationCache.getInstance());
-
- boolean value =
configuration.getBoolean("service.disableGlobalTransaction");
- TestListener listener = new TestListener();
-
ConfigurationCache.addConfigListener("service.disableGlobalTransaction",
listener);
- // check listener if exist
- HashSet<ConfigurationChangeListener> listeners =
configListenersMap.get("service.disableGlobalTransaction");
- Assertions.assertTrue(CollectionUtils.isNotEmpty(listeners));
- // change value,trigger listener
- System.setProperty("service.disableGlobalTransaction",
String.valueOf(!value));
- // remove null
- ConfigurationCache.removeConfigListener(null);
- // check listener if exist
- listeners = configListenersMap.get("service.disableGlobalTransaction");
- Assertions.assertTrue(CollectionUtils.isNotEmpty(listeners));
- // remove listener
-
ConfigurationCache.removeConfigListener("service.disableGlobalTransaction",
listener);
- // check listener if exist
- listeners = configListenersMap.get("service.disableGlobalTransaction");
- // is empty
- Assertions.assertTrue(CollectionUtils.isEmpty(listeners));
- }
public static class TestListener implements ConfigurationChangeListener {
diff --git
a/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java
b/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java
index 1be77f1a11..5c754e0b8b 100644
---
a/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java
+++
b/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java
@@ -18,6 +18,7 @@ package org.apache.seata.config;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -40,19 +41,27 @@ class FileConfigurationTest {
void addConfigListener() throws InterruptedException {
Configuration fileConfig = ConfigurationFactory.getInstance();
CountDownLatch countDownLatch = new CountDownLatch(1);
- boolean value =
fileConfig.getBoolean("service.disableGlobalTransaction");
-
ConfigurationCache.addConfigListener("service.disableGlobalTransaction",
(event) -> {
- Assertions.assertEquals(Boolean.parseBoolean(event.getNewValue()),
!Boolean.parseBoolean(event.getOldValue()));
- countDownLatch.countDown();
+ String dataId = "service.disableGlobalTransaction";
+ boolean value = fileConfig.getBoolean(dataId);
+ fileConfig.addConfigListener(dataId, new
CachedConfigurationChangeListener() {
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+
Assertions.assertEquals(Boolean.parseBoolean(event.getNewValue()),
+ !Boolean.parseBoolean(event.getOldValue()));
+ countDownLatch.countDown();
+ }
});
- System.setProperty("service.disableGlobalTransaction",
String.valueOf(!value));
- countDownLatch.await(5, TimeUnit.SECONDS);
+ System.setProperty(dataId, String.valueOf(!value));
+ countDownLatch.await(2, TimeUnit.SECONDS);
System.setProperty("file.listener.enabled", "false");
- System.setProperty("service.disableGlobalTransaction",
String.valueOf(value));
- Thread.sleep(2000);
- boolean currentValue =
fileConfig.getBoolean("service.disableGlobalTransaction");
+ //wait for loop safety, loop time is LISTENER_CONFIG_INTERVAL=1s
+ Thread.sleep(1500);
+ System.setProperty(dataId, String.valueOf(value));
+ //sleep for a period of time to simulate waiting for a cache
refresh.Actually, it doesn't trigger.
+ Thread.sleep(1000);
+ boolean currentValue = fileConfig.getBoolean(dataId);
Assertions.assertNotEquals(value, currentValue);
- System.setProperty("service.disableGlobalTransaction",
String.valueOf(!value));
+ System.setProperty(dataId, String.valueOf(!value));
}
@Test
diff --git
a/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java
b/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java
index aaa685dffa..a1c107d139 100644
---
a/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java
+++
b/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java
@@ -33,7 +33,7 @@ class ProConfigurationFactoryTest {
Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testNull"));
Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testExist"));
Configuration instance = ConfigurationFactory.getInstance();
-
Assertions.assertEquals(instance.getConfig("service.disableGlobalTransaction"),
"true");
+
Assertions.assertEquals(instance.getConfig("client.undo.compress.enable"),
"true");
Assertions.assertEquals(instance.getConfig("service.default.grouplist"),
"127.0.0.1:8092");
}
diff --git
a/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java
b/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java
index b04a7b8fdd..c04ddc9700 100644
---
a/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java
+++
b/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java
@@ -30,7 +30,6 @@ class RegistryConfigurationFactoryTest {
ConfigurationFactory.reload();
Assertions.assertEquals(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.name"),"file-test.conf");
Configuration instance = ConfigurationFactory.getInstance();
-
Assertions.assertEquals(instance.getConfig("service.disableGlobalTransaction"),"true");
Assertions.assertEquals(instance.getConfig("service.default.grouplist"),
"127.0.0.1:8091");
}
diff --git
a/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java
b/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java
index 2c8958d73c..f50d31808a 100644
---
a/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java
+++
b/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java
@@ -33,7 +33,7 @@ class YamlConfigurationFactoryTest {
Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testNull"));
Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testExist"));
Configuration instance = ConfigurationFactory.getInstance();
-
Assertions.assertEquals(instance.getConfig("service.disableGlobalTransaction"),
"true");
+ Assertions.assertEquals(instance.getConfig("transport.heartbeat"),
"true");
Assertions.assertEquals(instance.getConfig("service.default.grouplist"),
"127.0.0.1:8093");
}
diff --git a/config/seata-config-core/src/test/resources/file-test-pro.conf
b/config/seata-config-core/src/test/resources/file-test-pro.conf
index 49b0f12968..63098d6ac4 100644
--- a/config/seata-config-core/src/test/resources/file-test-pro.conf
+++ b/config/seata-config-core/src/test/resources/file-test-pro.conf
@@ -22,4 +22,7 @@ service {
default.grouplist = "127.0.0.1:8092"
#disable seata
disableGlobalTransaction = true
+}
+client {
+ undo.compress.enable=true
}
\ No newline at end of file
diff --git a/config/seata-config-core/src/test/resources/file-test-yaml.conf
b/config/seata-config-core/src/test/resources/file-test-yaml.conf
index 7ec3ed688c..120200b1e6 100644
--- a/config/seata-config-core/src/test/resources/file-test-yaml.conf
+++ b/config/seata-config-core/src/test/resources/file-test-yaml.conf
@@ -22,4 +22,8 @@ service {
default.grouplist = "127.0.0.1:8093"
#disable seata
disableGlobalTransaction = true
+}
+
+transport {
+ heartbeat = true
}
\ No newline at end of file
diff --git
a/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java
b/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java
index fc20d198e2..1fcbfdfb51 100644
---
a/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java
+++
b/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java
@@ -60,7 +60,6 @@ public class CustomConfigurationForTest extends
AbstractConfiguration {
@Override
public void addConfigListener(String dataId, ConfigurationChangeListener
listener) {
- throw new UnsupportedOperationException();
}
@Override
diff --git
a/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java
b/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java
index b7a9106b9f..c81d06e5af 100644
---
a/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java
+++
b/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java
@@ -394,7 +394,7 @@ public class EtcdConfiguration extends
AbstractConfiguration {
List<KeyValue> keyValues = getResponse.getKvs();
if (CollectionUtils.isNotEmpty(keyValues)) {
event.setDataId(dataId).setNewValue(keyValues.get(0).getValue().toString(UTF_8));
- listener.onChangeEvent(event);
+ listener.onProcessEvent(event);
}
} catch (Exception e) {
LOGGER.error("error occurred while getting value{}",
e.getMessage(), e);
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java
index 972998c945..f0e047ad58 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java
@@ -51,7 +51,7 @@ public class NettyClientConfig extends NettyBaseConfig {
private static final int MAX_CHECK_ALIVE_RETRY = 300;
private static final int CHECK_ALIVE_INTERVAL = 10;
private static final String SOCKET_ADDRESS_START_CHAR = "/";
- private static final long MAX_ACQUIRE_CONN_MILLS = 60 * 1000L;
+ private static final long MAX_ACQUIRE_CONN_MILLS = 10 * 1000L;
private static final String RPC_DISPATCH_THREAD_PREFIX = "rpcDispatch";
private static final int DEFAULT_MAX_POOL_ACTIVE = 1;
private static final int DEFAULT_MIN_POOL_IDLE = 0;
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java
index 715230131e..3e6ec63c15 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java
@@ -68,6 +68,10 @@ public class NettyRemotingServer extends
AbstractNettyRemotingServer {
super(messageExecutor, new NettyServerConfig());
}
+ public NettyRemotingServer(ThreadPoolExecutor messageExecutor,
NettyServerConfig nettyServerConfig) {
+ super(messageExecutor, nettyServerConfig);
+ }
+
/**
* Sets transactionMessageHandler.
*
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
index 493992b1f4..6c9a325882 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
@@ -77,6 +77,10 @@ public class NettyServerBootstrap implements
RemotingBootstrap {
new
NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
nettyServerConfig.getServerWorkerThreads()));
}
+
+ if (nettyServerConfig.getServerListenPort() > 0) {
+ setListenPort(nettyServerConfig.getServerListenPort());
+ }
}
/**
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java
index 1d945e2671..9dd373ba0b 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java
@@ -49,7 +49,7 @@ public class NettyServerConfig extends NettyBaseConfig {
ConfigurationKeys.TRANSPORT_PREFIX + "writeBufferHighWaterMark",
String.valueOf(67108864)));
private int writeBufferLowWaterMark = Integer.parseInt(System.getProperty(
ConfigurationKeys.TRANSPORT_PREFIX + "writeBufferLowWaterMark",
String.valueOf(1048576)));
- private static final int DEFAULT_LISTEN_PORT = 8091;
+ private int serverListenPort = 0;
private static final long RPC_TC_REQUEST_TIMEOUT =
CONFIG.getLong(ConfigurationKeys.RPC_TC_REQUEST_TIMEOUT,
DEFAULT_RPC_TC_REQUEST_TIMEOUT);
private int serverChannelMaxIdleTimeSeconds =
Integer.parseInt(System.getProperty(
ConfigurationKeys.TRANSPORT_PREFIX +
"serverChannelMaxIdleTimeSeconds", String.valueOf(30)));
@@ -217,8 +217,12 @@ public class NettyServerConfig extends NettyBaseConfig {
*
* @return the listen port
*/
- public int getDefaultListenPort() {
- return DEFAULT_LISTEN_PORT;
+ public int getServerListenPort() {
+ return serverListenPort;
+ }
+
+ public void setServerListenPort(int port) {
+ this.serverListenPort = port;
}
/**
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
index 61f23687d9..92cbafd0a5 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
@@ -16,6 +16,14 @@
*/
package org.apache.seata.core.rpc.netty;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.seata.common.DefaultValues;
@@ -23,9 +31,9 @@ import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.StringUtils;
-import org.apache.seata.config.ConfigurationCache;
+import org.apache.seata.config.CachedConfigurationChangeListener;
+import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationChangeEvent;
-import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.model.Resource;
@@ -43,14 +51,6 @@ import
org.apache.seata.core.rpc.processor.client.RmUndoLogProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
import static org.apache.seata.common.Constants.DBKEYS_SPLIT_CHAR;
/**
@@ -92,9 +92,10 @@ public final class RmNettyRemotingClient extends
AbstractNettyRemotingClient {
ThreadPoolExecutor messageExecutor) {
super(nettyClientConfig, eventExecutorGroup, messageExecutor,
TransactionRole.RMROLE);
// set enableClientBatchSendRequest
- this.enableClientBatchSendRequest =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST,
+ Configuration configuration = ConfigurationFactory.getInstance();
+ this.enableClientBatchSendRequest =
configuration.getBoolean(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST,
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST,DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST));
-
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST,
new ConfigurationChangeListener() {
+
configuration.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST,
new CachedConfigurationChangeListener() {
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
String dataId = event.getDataId();
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
index f9efc61167..68ff739bbb 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
@@ -21,17 +21,19 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+
import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutorGroup;
+import org.apache.commons.lang.StringUtils;
import org.apache.seata.common.DefaultValues;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.thread.RejectedPolicies;
import org.apache.seata.common.util.NetUtil;
-import org.apache.seata.config.ConfigurationCache;
+import org.apache.seata.config.CachedConfigurationChangeListener;
+import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationChangeEvent;
-import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.auth.AuthSigner;
import org.apache.seata.core.constants.ConfigurationKeys;
@@ -41,7 +43,6 @@ import org.apache.seata.core.protocol.RegisterTMRequest;
import org.apache.seata.core.protocol.RegisterTMResponse;
import org.apache.seata.core.rpc.processor.client.ClientHeartbeatProcessor;
import org.apache.seata.core.rpc.processor.client.ClientOnResponseProcessor;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,9 +69,10 @@ public final class TmNettyRemotingClient extends
AbstractNettyRemotingClient {
super(nettyClientConfig, eventExecutorGroup, messageExecutor,
NettyPoolKey.TransactionRole.TMROLE);
this.signer = EnhancedServiceLoader.load(AuthSigner.class);
// set enableClientBatchSendRequest
- this.enableClientBatchSendRequest =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
+ Configuration configuration = ConfigurationFactory.getInstance();
+ this.enableClientBatchSendRequest =
configuration.getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);
-
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
new ConfigurationChangeListener() {
+
configuration.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
new CachedConfigurationChangeListener() {
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
String dataId = event.getDataId();
diff --git
a/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java
b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java
new file mode 100644
index 0000000000..45387fc155
--- /dev/null
+++
b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import org.junit.platform.suite.api.SelectClasses;
+import org.junit.platform.suite.api.Suite;
+
+@Suite
+@SelectClasses({TmNettyClientTest.class, RmNettyClientTest.class})
+public class NettyClientTestSuite {
+}
diff --git
a/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
b/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
index 14266b1508..1b365501b5 100644
--- a/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
+++ b/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java
@@ -16,45 +16,47 @@
*/
package org.apache.seata.core.rpc.netty;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.exception.FrameworkException;
+import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.ConfigurationCache;
+import org.apache.seata.config.ConfigurationChangeEvent;
+import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Rm RPC client test.
- *
*/
+@Order(2)
class RmNettyClientTest {
-
+
Logger logger = LoggerFactory.getLogger(getClass());
-
+
@BeforeAll
public static void beforeAll() {
RmNettyRemotingClient.getInstance().destroy();
-
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"true");
}
@AfterAll
public static void afterAll() {
RmNettyRemotingClient.getInstance().destroy();
-
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"false");
}
@Test
@@ -85,12 +87,16 @@ class RmNettyClientTest {
Mockito.when(resourceManager.getManagedResources()).thenReturn(resourceMap);
newClient.setResourceManager(resourceManager);
System.setProperty("file.listener.enabled", "true");
-
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
- event -> logger.info("dataId:{}, value: {}, oldValue: {}",
event.getDataId(), event.getNewValue(),
- event.getOldValue()));
+
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
new CachedConfigurationChangeListener() {
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+ logger.info("dataId:{}, value: {}, oldValue: {}",
event.getDataId(), event.getNewValue(), event.getOldValue());
+ }
+ });
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"true");
- Thread.sleep(2000);
+ ConfigurationCache.clear();
Assertions.assertThrows(FrameworkException.class, newClient::init);
+
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"false");
}
private AtomicBoolean getInitializeStatus(final RmNettyRemotingClient
rmNettyRemotingClient) {
diff --git
a/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
b/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
index ca620f73b8..924711438f 100644
--- a/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
+++ b/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java
@@ -16,37 +16,40 @@
*/
package org.apache.seata.core.rpc.netty;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.exception.FrameworkException;
+import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.ConfigurationCache;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.seata.config.ConfigurationChangeEvent;
+import org.apache.seata.config.ConfigurationFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
/**
* The type Tm rpc client test.
- *
*/
+@Order(1)
public class TmNettyClientTest {
Logger logger = LoggerFactory.getLogger(getClass());
- private static final ThreadPoolExecutor
- workingThreads = new ThreadPoolExecutor(100, 500, 500,
TimeUnit.SECONDS,
+ private static final ThreadPoolExecutor workingThreads = new
ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000), new
ThreadPoolExecutor.CallerRunsPolicy());
/**
@@ -79,18 +82,19 @@ public class TmNettyClientTest {
@Test
public void testInit() throws Exception {
String applicationId = "app 1";
- String transactionServiceGroup = "group A";
- TmNettyRemotingClient tmNettyRemotingClient =
TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
-
+ String transactionServiceGroup = "default_tx_group";
+ TmNettyRemotingClient tmNettyRemotingClient =
TmNettyRemotingClient.getInstance(applicationId,
+ transactionServiceGroup);
+
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"false");
+ ConfigurationCache.clear();
tmNettyRemotingClient.init();
-
//check if attr of tmNettyClient object has been set success
Field clientBootstrapField = getDeclaredField(tmNettyRemotingClient,
"clientBootstrap");
clientBootstrapField.setAccessible(true);
NettyClientBootstrap clientBootstrap =
(NettyClientBootstrap)clientBootstrapField.get(tmNettyRemotingClient);
Field bootstrapField = getDeclaredField(clientBootstrap, "bootstrap");
bootstrapField.setAccessible(true);
- Bootstrap bootstrap = (Bootstrap) bootstrapField.get(clientBootstrap);
+ Bootstrap bootstrap = (Bootstrap)bootstrapField.get(clientBootstrap);
Assertions.assertNotNull(bootstrap);
Field optionsField = getDeclaredField(bootstrap, "options");
@@ -134,20 +138,26 @@ public class TmNettyClientTest {
@AfterAll
public static void afterAll() {
TmNettyRemotingClient.getInstance().destroy();
-
System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"false");
}
@Test
public void testCheckFailFast() throws Exception {
TmNettyRemotingClient.getInstance().destroy();
+
System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"false");
TmNettyRemotingClient tmClient =
TmNettyRemotingClient.getInstance("fail_fast", "default_tx_group");
System.setProperty("file.listener.enabled", "true");
-
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
- event -> logger.info("dataId:{}, value: {}, oldValue: {}",
event.getDataId(), event.getNewValue(),
- event.getOldValue()));
+
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
+ new CachedConfigurationChangeListener() {
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+ logger.info("dataId:{}, value: {}, oldValue: {}",
event.getDataId(), event.getNewValue(),
+ event.getOldValue());
+ }
+ });
System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"true");
- Thread.sleep(2000);
+ ConfigurationCache.clear();
Assertions.assertThrows(FrameworkException.class, tmClient::init);
+
System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
"false");
}
/**
diff --git
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
index d1d0a4f3e0..fd9561434f 100644
---
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
+++
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java
@@ -17,16 +17,16 @@
package org.apache.seata.discovery.registry;
import java.net.InetSocketAddress;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Collection;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import org.apache.seata.config.ConfigurationCache;
+
import org.apache.seata.config.ConfigurationFactory;
/**
@@ -113,7 +113,6 @@ public interface RegistryService<T> {
default String getServiceGroup(String key) {
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING
+ key;
if (!SERVICE_GROUP_NAME.contains(key)) {
- ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
return ConfigurationFactory.getInstance().getConfig(key);
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
index e390b79433..288bd5dd78 100644
---
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
+++
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
@@ -27,9 +27,9 @@ import com.google.common.eventbus.Subscribe;
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.StringUtils;
-import org.apache.seata.config.ConfigurationCache;
+import org.apache.seata.config.CachedConfigurationChangeListener;
+import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationChangeEvent;
-import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.event.EventBus;
@@ -70,7 +70,7 @@ import static
org.apache.seata.tm.api.GlobalTransactionRole.Participant;
* The type Global transactional interceptor handler.
*
*/
-public class GlobalTransactionalInterceptorHandler extends
AbstractProxyInvocationHandler implements ConfigurationChangeListener {
+public class GlobalTransactionalInterceptorHandler extends
AbstractProxyInvocationHandler implements CachedConfigurationChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(GlobalTransactionalInterceptorHandler.class);
@@ -116,20 +116,18 @@ public class GlobalTransactionalInterceptorHandler
extends AbstractProxyInvocati
public GlobalTransactionalInterceptorHandler(FailureHandler
failureHandler, Set<String> methodsToProxy) {
this.failureHandler = failureHandler == null ?
FailureHandlerHolder.getFailureHandler() : failureHandler;
this.methodsToProxy = methodsToProxy;
- this.disable =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
+ Configuration configuration = ConfigurationFactory.getInstance();
+ this.disable =
configuration.getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
-
- boolean degradeCheck =
ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
+ boolean degradeCheck =
configuration.getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
- degradeCheckPeriod = ConfigurationFactory.getInstance()
- .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD,
DEFAULT_TM_DEGRADE_CHECK_PERIOD);
- degradeCheckAllowTimes = ConfigurationFactory.getInstance()
- .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES,
DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
+ degradeCheckPeriod =
configuration.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD,
DEFAULT_TM_DEGRADE_CHECK_PERIOD);
+ degradeCheckAllowTimes =
configuration.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES,
DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheck && degradeCheckPeriod > 0 && degradeCheckAllowTimes >
0) {
startDegradeCheck();
}
-
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
this);
+
configuration.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
this.initDefaultGlobalTransactionTimeout();
}
diff --git
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java
index 7a4afb964c..d7edfa458e 100644
---
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java
+++
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java
@@ -19,11 +19,12 @@ package
org.apache.seata.integration.tx.api.interceptor.parser;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
+
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.ReflectionUtil;
-import org.apache.seata.config.ConfigurationCache;
-import org.apache.seata.config.ConfigurationChangeListener;
+import org.apache.seata.config.CachedConfigurationChangeListener;
+import org.apache.seata.config.ConfigurationFactory;
import
org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler;
import
org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
import org.apache.seata.spring.annotation.GlobalLock;
@@ -50,7 +51,7 @@ public class GlobalTransactionalInterceptorParser implements
InterfaceParser {
if (existsAnnotation(serviceInterface) ||
existsAnnotation(interfacesIfJdk)) {
ProxyInvocationHandler proxyInvocationHandler =
createProxyInvocationHandler();
-
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener) proxyInvocationHandler);
+
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(CachedConfigurationChangeListener) proxyInvocationHandler);
return proxyInvocationHandler;
}
diff --git a/pom.xml b/pom.xml
index a9feab80b9..c558031e11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,16 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-suite-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-suite-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java
index 454a55955d..e81bdf1281 100644
---
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java
+++
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java
@@ -18,10 +18,9 @@ package org.apache.seata.rm.datasource.exec;
import org.apache.seata.common.DefaultValues;
import org.apache.seata.common.util.NumberUtils;
+import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.Configuration;
-import org.apache.seata.config.ConfigurationCache;
import org.apache.seata.config.ConfigurationChangeEvent;
-import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.context.GlobalLockConfigHolder;
@@ -35,10 +34,11 @@ import org.apache.seata.core.model.GlobalLockConfig;
public class LockRetryController {
private static final GlobalConfig LISTENER = new GlobalConfig();
+ private static final Configuration CONFIG =
ConfigurationFactory.getInstance();
static {
-
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL,
LISTENER);
-
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES,
LISTENER);
+ CONFIG.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL,
LISTENER);
+ CONFIG.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES,
LISTENER);
}
private int lockRetryInterval;
@@ -98,7 +98,7 @@ public class LockRetryController {
return LISTENER.getGlobalLockRetryTimes();
}
- static class GlobalConfig implements ConfigurationChangeListener {
+ static class GlobalConfig implements CachedConfigurationChangeListener {
private volatile int globalLockRetryInterval;
diff --git
a/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java
b/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java
index f702334247..5321532d5b 100644
---
a/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java
+++
b/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java
@@ -32,10 +32,9 @@ import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.loader.Scope;
import org.apache.seata.common.util.IOUtil;
import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.Configuration;
-import org.apache.seata.config.ConfigurationCache;
import org.apache.seata.config.ConfigurationChangeEvent;
-import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.constants.ServerTableColumnsName;
@@ -94,7 +93,7 @@ public class DataBaseDistributedLocker implements
DistributedLocker {
if (StringUtils.isBlank(distributedLockTable)) {
demotion = true;
- ConfigurationCache.addConfigListener(DISTRIBUTED_LOCK_DB_TABLE,
new ConfigurationChangeListener() {
+ configuration.addConfigListener(DISTRIBUTED_LOCK_DB_TABLE, new
CachedConfigurationChangeListener() {
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
String newValue = event.getNewValue();
@@ -102,7 +101,7 @@ public class DataBaseDistributedLocker implements
DistributedLocker {
distributedLockTable = newValue;
init();
demotion = false;
-
ConfigurationCache.removeConfigListener(DISTRIBUTED_LOCK_DB_TABLE, this);
+
ConfigurationFactory.getInstance().removeConfigListener(DISTRIBUTED_LOCK_DB_TABLE,
this);
}
}
});
diff --git
a/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java
b/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java
index b6d7c22686..cc4c643a6c 100644
---
a/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java
+++
b/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java
@@ -23,14 +23,17 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableSet;
+import org.aopalliance.aop.Advice;
+import org.aopalliance.intercept.MethodInterceptor;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
-import org.apache.seata.config.ConfigurationCache;
+import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.ConfigurationChangeEvent;
-import org.apache.seata.config.ConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.rpc.ShutdownHook;
@@ -54,9 +57,6 @@ import org.apache.seata.spring.util.SpringProxyUtils;
import org.apache.seata.tm.TMClient;
import org.apache.seata.tm.api.FailureHandler;
import org.apache.seata.tm.api.FailureHandlerHolder;
-import org.aopalliance.aop.Advice;
-import org.aopalliance.intercept.MethodInterceptor;
-import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.Advisor;
@@ -86,7 +86,7 @@ import static
org.apache.seata.common.DefaultValues.DEFAULT_TX_GROUP_OLD;
*
*/
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
- implements ConfigurationChangeListener, InitializingBean,
ApplicationContextAware, DisposableBean {
+ implements CachedConfigurationChangeListener, InitializingBean,
ApplicationContextAware, DisposableBean {
private static final long serialVersionUID = 1L;
@@ -473,8 +473,7 @@ public class GlobalTransactionScanner extends
AbstractAutoProxyCreator
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
-
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener) this);
+
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(CachedConfigurationChangeListener) this);
return;
}
if (initialized.compareAndSet(false, true)) {
@@ -561,7 +560,7 @@ public class GlobalTransactionScanner extends
AbstractAutoProxyCreator
LOGGER.info("{} config changed, old value:true, new value:{}",
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
event.getNewValue());
initClient();
-
ConfigurationCache.removeConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
this);
+
ConfigurationFactory.getInstance().removeConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
this);
}
}
}
diff --git
a/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java
b/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java
index 308d7211f3..d8708605e0 100644
--- a/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java
+++ b/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java
@@ -16,14 +16,15 @@
*/
package org.apache.seata.common;
-import org.apache.seata.config.ConfigurationCache;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.seata.config.CachedConfigurationChangeListener;
+import org.apache.seata.config.ConfigurationChangeEvent;
import org.apache.seata.config.ConfigurationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* the type ConfigurationTestHelper
**/
@@ -38,7 +39,13 @@ public class ConfigurationTestHelper {
public static void putConfig(String dataId, String content) {
CountDownLatch countDownLatch = new CountDownLatch(1);
-
ConfigurationCache.addConfigListener(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL,
event -> countDownLatch.countDown());
+
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL,
+ new CachedConfigurationChangeListener() {
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+ countDownLatch.countDown();
+ }
+ });
if (content == null) {
System.clearProperty(dataId);
ConfigurationFactory.getInstance().removeConfig(dataId);
diff --git
a/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java
b/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java
index bb26a62dca..670261d741 100644
--- a/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java
+++ b/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java
@@ -25,6 +25,7 @@ import org.apache.seata.common.XID;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.core.rpc.ShutdownHook;
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
+import org.apache.seata.core.rpc.netty.NettyServerConfig;
import org.apache.seata.server.ParameterParser;
import org.apache.seata.server.UUIDGenerator;
import org.apache.seata.server.coordinator.DefaultCoordinator;
@@ -55,7 +56,9 @@ public abstract class AbstractServerTest {
//initialize the metrics
MetricsManager.get().init();
- nettyServer = new NettyRemotingServer(workingThreads);
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ nettyServerConfig.setServerListenPort(8091);
+ nettyServer = new NettyRemotingServer(workingThreads,
nettyServerConfig);
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file、db
SessionHolder.init();
@@ -63,6 +66,7 @@ public abstract class AbstractServerTest {
DefaultCoordinator coordinator =
DefaultCoordinator.getInstance(nettyServer);
coordinator.init();
nettyServer.setHandler(coordinator);
+
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]