This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 7accdf8d4a feature: Introduce Cleanup API for `TableMetaRefreshHolder`
Instance (#7559)
7accdf8d4a is described below
commit 7accdf8d4ac0705800bc0872188b9471eab7945c
Author: Yongjun Hong <[email protected]>
AuthorDate: Mon Sep 22 15:52:53 2025 +0900
feature: Introduce Cleanup API for `TableMetaRefreshHolder` Instance (#7559)
---
changes/en-us/2.x.md | 3 +-
changes/zh-cn/2.x.md | 1 +
.../seata/rm/datasource/DataSourceProxy.java | 5 ++++
.../sql/struct/TableMetaCacheFactory.java | 22 ++++++++++++++-
.../seata/rm/datasource/DataSourceProxyTest.java | 25 ++++++++++++++++
.../sql/struct/TableMetaCacheFactoryTest.java | 33 ++++++++++++++++++++++
6 files changed, 87 insertions(+), 2 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 3e3d475f43..890dc420f0 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -23,7 +23,8 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http
request filter for seata-server
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse
connection to merge branch transactions
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP
client in common module to support HTTP/2
- [[#7551](https://github.com/apache/incubator-seata/pull/7551)] XAUtils add
support for DM Database
+- [[#7551](https://github.com/apache/incubator-seata/pull/7551)] XAUtils add
support for DM Database
+- [[#7559](https://github.com/apache/incubator-seata/pull/7559)] Introduce
Cleanup API for TableMetaRefreshHolder Instance
### bugfix:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 4f4166aac9..6eb04046d7 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -24,6 +24,7 @@
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common
模块中的 HTTP 客户端以支持 HTTP/2
- [[#7551](https://github.com/apache/incubator-seata/pull/7551)] XAUtils支持达梦数据库
+- [[#7559](https://github.com/apache/incubator-seata/pull/7559)] 为
TableMetaRefreshHolder 实例引入清理 API
### bugfix:
diff --git
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java
index 5ec04101d9..fc0ef6aec7 100644
---
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java
+++
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java
@@ -452,4 +452,9 @@ public class DataSourceProxy extends
AbstractDataSourceProxy implements Resource
LOGGER.error("check mysql version fail error: {}", e.getMessage());
}
}
+
+ public void close() throws Exception {
+ // TODO: Need to unregister resource from DefaultResourceManager
+ TableMetaCacheFactory.shutdown(resourceId);
+ }
}
diff --git
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactory.java
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactory.java
index 5cf5055c77..33dbb67795 100644
---
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactory.java
+++
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactory.java
@@ -108,7 +108,19 @@ public class TableMetaCacheFactory {
LOGGER.info("Removed TableMetaRefreshHolder for resourceId: {}",
resourceId);
}
+ /**
+ * Shutdown all TableMetaRefreshHolder threads.
+ */
+ public static void shutdown(String resourceId) {
+ TableMetaRefreshHolder holder =
TABLE_META_REFRESH_HOLDER_MAP.remove(resourceId);
+ if (holder != null) {
+ holder.shutdown();
+ LOGGER.info("TableMetaRefreshHolder for resourceId: {} has been
shutdown.", resourceId);
+ }
+ }
+
static class TableMetaRefreshHolder {
+ private volatile boolean stopped = false;
private long lastRefreshFinishTime;
private DataSourceProxy dataSource;
private BlockingQueue<Long> tableMetaRefreshQueue;
@@ -128,7 +140,7 @@ public class TableMetaCacheFactory {
this.tableMetaRefreshQueue = new
LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
tableMetaRefreshExecutor.execute(() -> {
- while (true) {
+ while (!stopped) {
// 1. check table meta
if (ENABLE_TABLE_META_CHECKER_ENABLE
&& System.nanoTime() - lastRefreshFinishTime
@@ -187,5 +199,13 @@ public class TableMetaCacheFactory {
}
return StringUtils.isNotBlank(message) &&
message.contains("datasource") && message.contains("close");
}
+
+ private void shutdown() {
+ stopped = true;
+ if (tableMetaRefreshExecutor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor) tableMetaRefreshExecutor).shutdownNow();
+ }
+ LOGGER.info("TableMetaRefreshHolder shutdown for resourceId: {}",
dataSource.getResourceId());
+ }
}
}
diff --git
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java
index a3d2a78edc..67c07514fb 100644
---
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java
+++
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java
@@ -17,6 +17,7 @@
package org.apache.seata.rm.datasource;
import com.alibaba.druid.pool.DruidDataSource;
+import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.datasource.mock.MockDataSource;
import org.apache.seata.rm.datasource.mock.MockDriver;
import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
@@ -215,4 +216,28 @@ public class DataSourceProxyTest {
return new DataSourceProxy(dataSource);
}
}
+
+ @Test
+ public void testCloseRemovesResource() throws Exception {
+ final MockDriver mockDriver = new MockDriver();
+ final String username = "username";
+ final String jdbcUrl = "jdbc:mock:xxx";
+
+ final DruidDataSource dataSource = new DruidDataSource();
+ dataSource.setUrl(jdbcUrl);
+ dataSource.setDriver(mockDriver);
+ dataSource.setUsername(username);
+ dataSource.setPassword("password");
+
+ DataSourceProxy proxy = getDataSourceProxy(dataSource);
+ try (MockedStatic<DefaultResourceManager> drmStatic =
Mockito.mockStatic(DefaultResourceManager.class);
+ MockedStatic<TableMetaCacheFactory> tmcfStatic =
Mockito.mockStatic(TableMetaCacheFactory.class)) {
+ DefaultResourceManager drm =
Mockito.mock(DefaultResourceManager.class);
+ drmStatic.when(DefaultResourceManager::get).thenReturn(drm);
+
+ proxy.close();
+
+ tmcfStatic.verify(() ->
TableMetaCacheFactory.shutdown(proxy.getResourceId()));
+ }
+ }
}
diff --git
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactoryTest.java
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactoryTest.java
index 5664a5964a..18144391af 100644
---
a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactoryTest.java
+++
b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/sql/struct/TableMetaCacheFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.seata.rm.datasource.sql.struct;
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
+import org.apache.seata.rm.datasource.DataSourceProxy;
+import org.apache.seata.rm.datasource.mock.MockDataSource;
import org.apache.seata.rm.datasource.sql.struct.cache.MariadbTableMetaCache;
import org.apache.seata.rm.datasource.sql.struct.cache.MysqlTableMetaCache;
import org.apache.seata.rm.datasource.sql.struct.cache.OceanBaseTableMetaCache;
@@ -24,12 +26,24 @@ import
org.apache.seata.rm.datasource.sql.struct.cache.OracleTableMetaCache;
import org.apache.seata.rm.datasource.sql.struct.cache.PolarDBXTableMetaCache;
import org.apache.seata.sqlparser.util.JdbcConstants;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Field;
+import java.util.Map;
+
public class TableMetaCacheFactoryTest {
private static final String NOT_EXIST_SQL_TYPE = "not_exist_sql_type";
+ @BeforeEach
+ public void clearTableMetaRefreshHolderMap() throws Exception {
+ Field field =
TableMetaCacheFactory.class.getDeclaredField("TABLE_META_REFRESH_HOLDER_MAP");
+ field.setAccessible(true);
+ Map<?, ?> map = (Map<?, ?>) field.get(null);
+ map.clear();
+ }
+
@Test
public void getTableMetaCache() {
Assertions.assertTrue(
@@ -52,4 +66,23 @@ public class TableMetaCacheFactoryTest {
TableMetaCacheFactory.getTableMetaCache(NOT_EXIST_SQL_TYPE);
});
}
+
+ @Test
+ public void shutdownTest() throws NoSuchFieldException,
IllegalAccessException {
+ DataSourceProxy dummy = new DataSourceProxy(new MockDataSource(),
"dummy1");
+ TableMetaCacheFactory.registerTableMeta(dummy);
+
+ Map<String, TableMetaCacheFactory.TableMetaRefreshHolder> holderMap =
getTableMetaRefreshHolderMap();
+ Assertions.assertEquals(1, holderMap.size());
+
+ TableMetaCacheFactory.shutdown("jdbc:mysql://127.0.0.1:3306/seata");
+ Assertions.assertTrue(holderMap.isEmpty(), "TableMetaRefreshHolder map
should be empty after shutdown");
+ }
+
+ private Map<String, TableMetaCacheFactory.TableMetaRefreshHolder>
getTableMetaRefreshHolderMap()
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field =
TableMetaCacheFactory.class.getDeclaredField("TABLE_META_REFRESH_HOLDER_MAP");
+ field.setAccessible(true);
+ return (Map<String, TableMetaCacheFactory.TableMetaRefreshHolder>)
field.get(null);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]