This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 02e1514012 feature:[loom] replace the usages of synchronized with 
ReentrantLock (#7073)
02e1514012 is described below

commit 02e1514012315b87d0693201898a76d6ab66d6b4
Author: 云清 <33415199+lightclouds...@users.noreply.github.com>
AuthorDate: Fri Jan 17 16:06:46 2025 +0800

    feature:[loom] replace the usages of synchronized with ReentrantLock (#7073)
---
 changes/en-us/2.x.md                               |   2 +
 changes/zh-cn/2.x.md                               |   2 +
 .../org/apache/seata/common/lock/ResourceLock.java |  60 +++++++
 .../apache/seata/common/util/UUIDGenerator.java    |   5 +-
 .../apache/seata/common/lock/ResourceLockTest.java | 147 +++++++++++++++++
 .../registry/eureka/EurekaRegistryServiceImpl.java |   7 +-
 .../tx/api/fence/hook/TccHookManager.java          |   5 +-
 .../api/remoting/parser/DefaultRemotingParser.java |   5 +-
 .../seata/integration/tx/api/util/ProxyUtil.java   |   4 +-
 .../apache/seata/rm/datasource/util/JdbcUtils.java |   4 +-
 .../seata/rm/datasource/xa/ConnectionProxyXA.java  | 178 ++++++++++++---------
 .../seata/rm/datasource/xa/ResourceManagerXA.java  |   6 +-
 .../rocketmq/SeataMQProducerFactory.java           |   5 +-
 .../apache/seata/server/session/GlobalSession.java |   6 +-
 14 files changed, 345 insertions(+), 91 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 5940b569f7..419f3f6337 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -4,6 +4,7 @@ Add changes here for all PR submitted to the 2.x branch.
 
 ### feature:
 
+- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] support 
virtual thread,replace the usages of synchronized with ReentrantLock
 - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury 
undolog parser
 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster 
mode supports address translation
 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury 
serializer
@@ -43,6 +44,7 @@ Thanks to these contributors for their code commits. Please 
report an unintended
 - [slievrly](https://github.com/slievrly)
 - [lyl2008dsg](https://github.com/lyl2008dsg)
 - [remind](https://github.com/remind)
+- [lightClouds917](https://github.com/lightClouds917)
 - [GoodBoyCoder](https://github.com/GoodBoyCoder)
 - [PeppaO](https://github.com/PeppaO)
 - [funky-eyes](https://github.com/funky-eyes)
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 13e4e53183..2f7d72c5ef 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -4,6 +4,7 @@
 
 ### feature:
 
+- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] 
支持虚拟线程,用ReentrantLock替换synchronized的用法
 - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 
支持UndoLog的fury序列化方式
 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器
@@ -42,6 +43,7 @@
 - [slievrly](https://github.com/slievrly)
 - [lyl2008dsg](https://github.com/lyl2008dsg)
 - [remind](https://github.com/remind)
+- [lightClouds917](https://github.com/lightClouds917)
 - [GoodBoyCoder](https://github.com/GoodBoyCoder)
 - [PeppaO](https://github.com/PeppaO)
 - [funky-eyes](https://github.com/funky-eyes)
diff --git 
a/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java 
b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java
new file mode 100644
index 0000000000..22e815e578
--- /dev/null
+++ b/common/src/main/java/org/apache/seata/common/lock/ResourceLock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.lock;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The ResourceLock extends ReentrantLock and implements AutoCloseable,
+ * allowing it to be used in try-with-resources blocks without needing
+ * to unlock in a finally block.
+ *
+ * <h3>Example</h3>
+ * <pre>
+ * {@code
+ *   private final ResourceLock resourceLock = new ResourceLock();
+ *   try (ResourceLock lock = resourceLock.obtain()) {
+ *     // do something while holding the resource lock
+ *   }
+ * }
+ * </pre>
+ */
+public class ResourceLock extends ReentrantLock implements AutoCloseable {
+
+    /**
+     * Obtain the lock.
+     *
+     * @return this ResourceLock
+     */
+    public ResourceLock obtain() {
+        lock();
+        return this;
+    }
+
+
+    /**
+     * Unlock the resource lock.
+     *
+     * <p>This is typically used in try-with-resources blocks to automatically
+     * unlock the resource lock when the block is exited, regardless of whether
+     * an exception is thrown or not.
+     */
+    @Override
+    public void close() {
+        this.unlock();
+    }
+}
diff --git 
a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java 
b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java
index 542de3ed1e..7b8ea72759 100644
--- a/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java
+++ b/common/src/main/java/org/apache/seata/common/util/UUIDGenerator.java
@@ -16,12 +16,15 @@
  */
 package org.apache.seata.common.util;
 
+import org.apache.seata.common.lock.ResourceLock;
+
 /**
  * The type Uuid generator.
  */
 public class UUIDGenerator {
 
     private static volatile IdWorker idWorker;
+    private final static ResourceLock RESOURCE_LOCK = new ResourceLock();
 
     /**
      * generate UUID using snowflake algorithm
@@ -30,7 +33,7 @@ public class UUIDGenerator {
      */
     public static long generateUUID() {
         if (idWorker == null) {
-            synchronized (UUIDGenerator.class) {
+            try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
                 if (idWorker == null) {
                     init(null);
                 }
diff --git 
a/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java 
b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java
new file mode 100644
index 0000000000..0ed2e7e2b7
--- /dev/null
+++ b/common/src/test/java/org/apache/seata/common/lock/ResourceLockTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.lock;
+
+import org.apache.seata.common.util.CollectionUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+public class ResourceLockTest {
+
+    @Test
+    public void testObtainAndClose() {
+        ResourceLock resourceLock = new ResourceLock();
+
+        // Test obtaining the lock
+        try (ResourceLock lock = resourceLock.obtain()) {
+            assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be 
held by current thread");
+        }
+
+        // After try-with-resources, lock should be released
+        assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be 
released after try-with-resources");
+    }
+
+    @Test
+    public void testMultipleObtainAndClose() {
+        ResourceLock resourceLock = new ResourceLock();
+
+        // First obtain and release
+        try (ResourceLock lock = resourceLock.obtain()) {
+            assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be 
held by current thread");
+        }
+        assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be 
released after first try-with-resources");
+
+        // Second obtain and release
+        try (ResourceLock lock = resourceLock.obtain()) {
+            assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be 
held by current thread");
+        }
+        assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be 
released after second try-with-resources");
+    }
+
+    @Test
+    public void testResourceLockAutoRemovalFromMap() {
+        ConcurrentHashMap<String, ResourceLock> lockMap = new 
ConcurrentHashMap<>();
+        String key = "testKey";
+        // Use try-with-resources to obtain and release the lock
+        try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, 
key, k -> new ResourceLock()).obtain()) {
+            // Do something while holding the lock
+            assertTrue(lockMap.containsKey(key));
+            assertTrue(lockMap.get(key).isHeldByCurrentThread());
+        } finally {
+            assertFalse(lockMap.get(key).isHeldByCurrentThread());
+            assertTrue(lockMap.containsKey(key));
+            // Remove the lock from the map
+            lockMap.remove(key);
+            assertFalse(lockMap.containsKey(key));
+        }
+        // Ensure the lock is removed from the map
+        assertFalse(lockMap.containsKey(key));
+    }
+
+    @Test
+    public void testConcurrentLocking() throws InterruptedException {
+        ResourceLock resourceLock = new ResourceLock();
+
+        Thread t1 = new Thread(() -> {
+            try (ResourceLock lock = resourceLock.obtain()) {
+                try {
+                    Thread.sleep(100); // Hold the lock for 100ms
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        Thread t2 = new Thread(() -> {
+            assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not 
be held by current thread before t1 releases it");
+            try (ResourceLock lock = resourceLock.obtain()) {
+                assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should 
be held by current thread after t1 releases it");
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be 
released after both threads complete");
+    }
+
+    @Test
+    public void testLockInterruptibly() throws InterruptedException {
+        ResourceLock resourceLock = new ResourceLock();
+
+        Thread t1 = new Thread(() -> {
+            try (ResourceLock lock = resourceLock.obtain()) {
+                try {
+                    Thread.sleep(1000); // Hold the lock for 1000ms
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        t1.start();
+        Thread.sleep(50); // Wait for t1 to acquire the lock
+
+        Thread t2 = new Thread(() -> {
+            try {
+                resourceLock.lockInterruptibly();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        });
+
+        t2.start();
+        Thread.sleep(50); // Wait for t2 to attempt to acquire the lock
+
+        t2.interrupt(); // Interrupt t2
+
+        t1.join();
+        t2.join();
+
+        assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be 
released after t1 completes");
+    }
+}
diff --git 
a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
 
b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
index 5ab5191234..ef441c34bb 100644
--- 
a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
@@ -26,6 +26,7 @@ import com.netflix.discovery.EurekaClient;
 import com.netflix.discovery.EurekaEventListener;
 import com.netflix.discovery.shared.Application;
 import org.apache.seata.common.exception.EurekaRegistryException;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.common.util.CollectionUtils;
 import org.apache.seata.common.util.NetUtil;
 import org.apache.seata.common.util.StringUtils;
@@ -68,7 +69,7 @@ public class EurekaRegistryServiceImpl implements 
RegistryService<EurekaEventLis
     private static final Configuration FILE_CONFIG = 
ConfigurationFactory.CURRENT_FILE_INSTANCE;
     private static final ConcurrentMap<String, List<EurekaEventListener>> 
LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
     private static final ConcurrentMap<String, List<InetSocketAddress>> 
CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
-    private static final ConcurrentMap<String, Object> CLUSTER_LOCK = new 
ConcurrentHashMap<>();
+    private static final ConcurrentMap<String, ResourceLock> CLUSTER_LOCK = 
new ConcurrentHashMap<>();
 
     private static volatile ApplicationInfoManager applicationInfoManager;
     private static volatile CustomEurekaInstanceConfig instanceConfig;
@@ -140,8 +141,8 @@ public class EurekaRegistryServiceImpl implements 
RegistryService<EurekaEventLis
         }
         String clusterUpperName = clusterName.toUpperCase();
         if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) {
-            Object lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> 
new Object());
-            synchronized (lock) {
+            ResourceLock lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, 
k -> new ResourceLock());
+            try (ResourceLock ignored = lock.obtain()) {
                 if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) {
                     refreshCluster(clusterUpperName);
                     subscribe(clusterUpperName, event -> {
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java
 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java
index e6d537c73f..4a1048c40c 100644
--- 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java
@@ -20,11 +20,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.seata.common.lock.ResourceLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class TccHookManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TccHookManager.class);
+    private static final ResourceLock LOCK = new ResourceLock();
+
 
     private TccHookManager() {
 
@@ -40,7 +43,7 @@ public final class TccHookManager {
      */
     public static List<TccHook> getHooks() {
         if (CACHED_UNMODIFIABLE_HOOKS == null) {
-            synchronized (TccHookManager.class) {
+            try (ResourceLock ignored = LOCK.obtain()) {
                 if (CACHED_UNMODIFIABLE_HOOKS == null) {
                     CACHED_UNMODIFIABLE_HOOKS = 
Collections.unmodifiableList(TCC_HOOKS);
                 }
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java
 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java
index 0ed9625e61..9a1f8d307d 100644
--- 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/remoting/parser/DefaultRemotingParser.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.seata.common.exception.FrameworkException;
 import org.apache.seata.common.loader.EnhancedServiceLoader;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.common.util.CollectionUtils;
 import org.apache.seata.integration.tx.api.remoting.RemotingDesc;
 import org.apache.seata.integration.tx.api.remoting.RemotingParser;
@@ -43,6 +44,8 @@ public class DefaultRemotingParser {
      */
     protected static Map<Object, RemotingDesc> remotingServiceMap = new 
ConcurrentHashMap<>();
 
+    private final ResourceLock resourceLock = new ResourceLock();
+
     private static class SingletonHolder {
         private static final DefaultRemotingParser INSTANCE = new 
DefaultRemotingParser();
     }
@@ -79,7 +82,7 @@ public class DefaultRemotingParser {
      * @param remotingParser
      */
     public boolean registerRemotingParser(RemotingParser remotingParser) {
-        synchronized (this) {
+        try (ResourceLock ignored = resourceLock.obtain()) {
             return allRemotingParsers.add(remotingParser);
         }
     }
diff --git 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java
 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java
index 964840c993..c6c5ffdef4 100644
--- 
a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java
+++ 
b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/util/ProxyUtil.java
@@ -16,6 +16,7 @@
  */
 package org.apache.seata.integration.tx.api.util;
 
+import org.apache.seata.common.lock.ResourceLock;
 import 
org.apache.seata.integration.tx.api.interceptor.handler.DefaultInvocationHandler;
 import 
org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
 import 
org.apache.seata.integration.tx.api.interceptor.parser.DefaultInterfaceParser;
@@ -31,6 +32,7 @@ import static 
net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
 public class ProxyUtil {
 
     private static final Map<Object, Object> PROXYED_SET = new HashMap<>();
+    private static final ResourceLock RESOURCE_LOCK = new ResourceLock();
 
     public static <T> T createProxy(T target) {
         return createProxy(target, target.getClass().getName());
@@ -53,7 +55,7 @@ public class ProxyUtil {
      */
     public static <T> T createProxy(T target, String beanName) {
         try {
-            synchronized (PROXYED_SET) {
+            try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
                 if (PROXYED_SET.containsKey(target)) {
                     return (T) PROXYED_SET.get(target);
                 }
diff --git 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java
 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java
index 2e5c8b1f33..1ee4c1ef63 100644
--- 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java
+++ 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/util/JdbcUtils.java
@@ -17,6 +17,7 @@
 package org.apache.seata.rm.datasource.util;
 
 import org.apache.seata.common.loader.EnhancedServiceLoader;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.rm.BaseDataSourceResource;
 import org.apache.seata.rm.DefaultResourceManager;
 import org.apache.seata.sqlparser.SqlParserType;
@@ -33,10 +34,11 @@ import java.sql.SQLException;
 public final class JdbcUtils {
 
     private static volatile DbTypeParser dbTypeParser;
+    private final static ResourceLock RESOURCE_LOCK = new ResourceLock();
 
     static DbTypeParser getDbTypeParser() {
         if (dbTypeParser == null) {
-            synchronized (JdbcUtils.class) {
+            try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
                 if (dbTypeParser == null) {
                     dbTypeParser = 
EnhancedServiceLoader.load(DbTypeParser.class, 
SqlParserType.SQL_PARSER_TYPE_DRUID);
                 }
diff --git 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
index 6d35dfd630..709263d88c 100644
--- 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
+++ 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ConnectionProxyXA.java
@@ -24,6 +24,7 @@ import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 
 import org.apache.seata.common.DefaultValues;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.common.util.StringUtils;
 import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.core.exception.TransactionException;
@@ -69,6 +70,8 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
 
     private boolean shouldBeHeld = false;
 
+    private final ResourceLock resourceLock = new ResourceLock();
+
     /**
      * Constructor of Connection Proxy for XA mode.
      *
@@ -127,10 +130,12 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
      * @param applicationData application data
      * @throws SQLException SQLException
      */
-    public synchronized void xaCommit(String xid, long branchId, String 
applicationData) throws XAException {
-        XAXid xaXid = XAXidBuilder.build(xid, branchId);
-        xaResource.commit(xaXid, false);
-        releaseIfNecessary();
+    public void xaCommit(String xid, long branchId, String applicationData) 
throws XAException {
+        try (ResourceLock ignored = resourceLock.obtain()) {
+            XAXid xaXid = XAXidBuilder.build(xid, branchId);
+            xaResource.commit(xaXid, false);
+            releaseIfNecessary();
+        }
     }
 
     /**
@@ -139,12 +144,14 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
      * @param branchId transaction branch id
      * @param applicationData application data
      */
-    public synchronized void xaRollback(String xid, long branchId, String 
applicationData) throws XAException {
-        if (this.xaBranchXid != null) {
-            xaRollback(xaBranchXid);
-        } else {
-            XAXid xaXid = XAXidBuilder.build(xid, branchId);
-            xaRollback(xaXid);
+    public void xaRollback(String xid, long branchId, String applicationData) 
throws XAException {
+        try (ResourceLock ignored = resourceLock.obtain()) {
+            if (this.xaBranchXid != null) {
+                xaRollback(xaBranchXid);
+            } else {
+                XAXid xaXid = XAXidBuilder.build(xid, branchId);
+                xaRollback(xaXid);
+            }
         }
     }
 
@@ -214,43 +221,45 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
     }
 
     @Override
-    public synchronized void commit() throws SQLException {
-        if (currentAutoCommitStatus || isReadOnly()) {
-            // Ignore the committing on an autocommit session and read-only 
transaction.
-            return;
-        }
-        if (!xaActive || this.xaBranchXid == null) {
-            throw new SQLException("should NOT commit on an inactive session", 
SQLSTATE_XA_NOT_END);
-        }
-        try {
-            // XA End: Success
-            try {
-                end(XAResource.TMSUCCESS);
-            } catch (SQLException sqle) {
-                // Rollback immediately before the XA Branch Context is 
deleted.
-                String xaBranchXid = this.xaBranchXid.toString();
-                rollback();
-                throw new SQLException("Branch " + xaBranchXid + " was 
rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, 
sqle);
+    public void commit() throws SQLException {
+        try (ResourceLock ignored = resourceLock.obtain()) {
+            if (currentAutoCommitStatus || isReadOnly()) {
+                // Ignore the committing on an autocommit session and 
read-only transaction.
+                return;
             }
-            long now = System.currentTimeMillis();
-            checkTimeout(now);
-            setPrepareTime(now);
-            int prepare = xaResource.prepare(xaBranchXid);
-            // Based on the four databases: MySQL (8), Oracle (12c), Postgres 
(16), and MSSQL Server (2022),
-            // only Oracle has read-only optimization; the others do not 
provide read-only feedback.
-            // Therefore, the database type check can be eliminated here.
-            if (prepare == XAResource.XA_RDONLY) {
-                // Branch Report to TC: RDONLY
-                reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
+            if (!xaActive || this.xaBranchXid == null) {
+                throw new SQLException("should NOT commit on an inactive 
session", SQLSTATE_XA_NOT_END);
+            }
+            try {
+                // XA End: Success
+                try {
+                    end(XAResource.TMSUCCESS);
+                } catch (SQLException sqle) {
+                    // Rollback immediately before the XA Branch Context is 
deleted.
+                    String xaBranchXid = this.xaBranchXid.toString();
+                    rollback();
+                    throw new SQLException("Branch " + xaBranchXid + " was 
rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, 
sqle);
+                }
+                long now = System.currentTimeMillis();
+                checkTimeout(now);
+                setPrepareTime(now);
+                int prepare = xaResource.prepare(xaBranchXid);
+                // Based on the four databases: MySQL (8), Oracle (12c), 
Postgres (16), and MSSQL Server (2022),
+                // only Oracle has read-only optimization; the others do not 
provide read-only feedback.
+                // Therefore, the database type check can be eliminated here.
+                if (prepare == XAResource.XA_RDONLY) {
+                    // Branch Report to TC: RDONLY
+                    reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
+                }
+            } catch (XAException xe) {
+                // Branch Report to TC: Failed
+                reportStatusToTC(BranchStatus.PhaseOne_Failed);
+                throw new SQLException(
+                        "Failed to end(TMSUCCESS)/prepare xa branch on " + xid 
+ "-" + xaBranchXid.getBranchId() + " since " + xe
+                                .getMessage(), xe);
+            } finally {
+                cleanXABranchContext();
             }
-        } catch (XAException xe) {
-            // Branch Report to TC: Failed
-            reportStatusToTC(BranchStatus.PhaseOne_Failed);
-            throw new SQLException(
-                "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + 
xaBranchXid.getBranchId() + " since " + xe
-                    .getMessage(), xe);
-        } finally {
-            cleanXABranchContext();
         }
     }
 
@@ -280,23 +289,25 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
         }
     }
 
-    private synchronized void start() throws XAException, SQLException {
-        // 3. XA Start
-        if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
-            xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE);
-        } else {
-            xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
-        }
+    private void start() throws XAException, SQLException {
+        try (ResourceLock ignored = resourceLock.obtain()) {
+            // 3. XA Start
+            if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
+                xaResource.start(this.xaBranchXid, 
SeataXAResource.ORATRANSLOOSE);
+            } else {
+                xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
+            }
 
-        try {
-            termination();
-        } catch (SQLException e) {
-            // the framework layer does not actively call ROLLBACK when 
setAutoCommit throws an SQL exception
-            xaResource.end(this.xaBranchXid, XAResource.TMFAIL);
-            xaRollback(xaBranchXid);
-            // Branch Report to TC: Failed
-            reportStatusToTC(BranchStatus.PhaseOne_Failed);
-            throw  e;
+            try {
+                termination();
+            } catch (SQLException e) {
+                // the framework layer does not actively call ROLLBACK when 
setAutoCommit throws an SQL exception
+                xaResource.end(this.xaBranchXid, XAResource.TMFAIL);
+                xaRollback(xaBranchXid);
+                // Branch Report to TC: Failed
+                reportStatusToTC(BranchStatus.PhaseOne_Failed);
+                throw e;
+            }
         }
     }
 
@@ -323,27 +334,31 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
     }
 
     @Override
-    public synchronized void close() throws SQLException {
-        rollBacked = false;
-        if (isHeld() && shouldBeHeld()) {
-            // if kept by a keeper, just hold the connection.
-            return;
+    public void close() throws SQLException {
+        try (ResourceLock ignored = resourceLock.obtain()) {
+            rollBacked = false;
+            if (isHeld() && shouldBeHeld()) {
+                // if kept by a keeper, just hold the connection.
+                return;
+            }
+            cleanXABranchContext();
+            originalConnection.close();
         }
-        cleanXABranchContext();
-        originalConnection.close();
     }
 
-    protected synchronized void closeForce() throws SQLException {
-        Connection physicalConn = getWrappedConnection();
-        if (physicalConn instanceof PooledConnection) {
-            physicalConn = ((PooledConnection) physicalConn).getConnection();
+    protected void closeForce() throws SQLException {
+        try (ResourceLock ignored = resourceLock.obtain()) {
+            Connection physicalConn = getWrappedConnection();
+            if (physicalConn instanceof PooledConnection) {
+                physicalConn = ((PooledConnection) 
physicalConn).getConnection();
+            }
+            // Force close the physical connection
+            physicalConn.close();
+            rollBacked = false;
+            cleanXABranchContext();
+            originalConnection.close();
+            releaseIfNecessary();
         }
-        // Force close the physical connection
-        physicalConn.close();
-        rollBacked = false;
-        cleanXABranchContext();
-        originalConnection.close();
-        releaseIfNecessary();
     }
 
     @Override
@@ -398,4 +413,11 @@ public class ConnectionProxyXA extends 
AbstractConnectionProxyXA implements Hold
         }
     }
 
+    /**
+     * Get the lock of the current connection
+     * @return the RESOURCE_LOCK
+     */
+    public ResourceLock getResourceLock() {
+        return resourceLock;
+    }
 }
diff --git 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java
 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java
index 8added9ff6..8d44495ddb 100644
--- 
a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java
+++ 
b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/xa/ResourceManagerXA.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.sql.SQLException;
 import javax.transaction.xa.XAException;
 import org.apache.seata.common.DefaultValues;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.core.exception.TransactionException;
@@ -53,6 +54,7 @@ public class ResourceManagerXA extends 
AbstractDataSourceCacheResourceManager {
      * The Timer check xa branch two phase hold timeout.
      */
     protected volatile ScheduledExecutorService xaTwoPhaseTimeoutChecker;
+    private final ResourceLock resourceLock = new ResourceLock();
 
     @Override
     public void init() {
@@ -61,7 +63,7 @@ public class ResourceManagerXA extends 
AbstractDataSourceCacheResourceManager {
 
     public void initXaTwoPhaseTimeoutChecker() {
         if (xaTwoPhaseTimeoutChecker == null) {
-            synchronized (this) {
+            try (ResourceLock ignored = resourceLock.obtain()) {
                 if (xaTwoPhaseTimeoutChecker == null) {
                     boolean shouldBeHold = 
dataSourceCache.values().parallelStream().anyMatch(resource -> {
                         if (resource instanceof DataSourceProxyXA) {
@@ -81,7 +83,7 @@ public class ResourceManagerXA extends 
AbstractDataSourceCacheResourceManager {
                                         for (Map.Entry<String, 
ConnectionProxyXA> connectionEntry : keeper.entrySet()) {
                                             ConnectionProxyXA connection = 
connectionEntry.getValue();
                                             long now = 
System.currentTimeMillis();
-                                            synchronized (connection) {
+                                            try (ResourceLock ignored2 = 
connection.getResourceLock().obtain()) {
                                                 if 
(connection.getPrepareTime() != null
                                                     && now - 
connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) {
                                                     try {
diff --git 
a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
index 63414aa129..c18e6f758a 100644
--- 
a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
+++ 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
@@ -19,6 +19,7 @@ package org.apache.seata.integration.rocketmq;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.seata.common.exception.NotSupportYetException;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.core.model.BranchType;
 import org.apache.seata.integration.tx.api.util.ProxyUtil;
 
@@ -29,7 +30,7 @@ public class SeataMQProducerFactory {
 
     public static final String ROCKET_TCC_NAME = "tccRocketMQ";
     public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC;
-
+    private static final ResourceLock RESOURCE_LOCK = new ResourceLock();
     /**
      * Default Producer, it can be replaced to Map after multi-resource is 
supported
      */
@@ -42,7 +43,7 @@ public class SeataMQProducerFactory {
     public static SeataMQProducer createSingle(String nameServer, String 
namespace,
                                                String groupName, RPCHook 
rpcHook) throws MQClientException {
         if (defaultProducer == null) {
-            synchronized (SeataMQProducerFactory.class) {
+            try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
                 if (defaultProducer == null) {
                     defaultProducer = new SeataMQProducer(namespace, 
groupName, rpcHook);
                     defaultProducer.setNamesrvAddr(nameServer);
diff --git 
a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java 
b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
index 251b9876e5..57ee9eea98 100644
--- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
+++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
@@ -31,6 +31,7 @@ import org.apache.seata.common.ConfigurationKeys;
 import org.apache.seata.common.Constants;
 import org.apache.seata.common.DefaultValues;
 import org.apache.seata.common.XID;
+import org.apache.seata.common.lock.ResourceLock;
 import org.apache.seata.common.util.BufferUtils;
 import org.apache.seata.common.util.StringUtils;
 import org.apache.seata.common.util.UUIDGenerator;
@@ -107,6 +108,9 @@ public class GlobalSession implements SessionLifecycle, 
SessionStorable {
 
     private Set<SessionLifecycleListener> lifecycleListeners = new 
HashSet<>(2);
 
+    private final ResourceLock resourceLock = new ResourceLock();
+
+
     /**
      * Add boolean.
      *
@@ -129,7 +133,7 @@ public class GlobalSession implements SessionLifecycle, 
SessionStorable {
      * @return the boolean
      */
     public boolean remove(BranchSession branchSession) {
-        synchronized (this) {
+        try (ResourceLock ignored = resourceLock.obtain()) {
             return branchSessions.remove(branchSession);
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org


Reply via email to