This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d0e278  Add ConnectionTransaction (#12888)
1d0e278 is described below

commit 1d0e2789f42bb81d46bf4486cc81db82a3d90246
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 2 18:32:04 2021 +0800

    Add ConnectionTransaction (#12888)
    
    * Add ContextManager.getDataSourceMap()
    
    * Fix test cases
    
    * Add ConnectionTransaction
    
    * Add ConnectionTransaction
---
 .../metadata/rule/ShardingSphereRuleMetaData.java  |  13 ++
 .../core/connection/ShardingSphereConnection.java  |  88 ++++++--------
 .../driver/executor/AbstractBaseExecutorTest.java  |   7 +-
 .../driver/jdbc/adapter/ConnectionAdapterTest.java |   7 +-
 .../connection/ShardingSphereConnectionTest.java   |  15 ++-
 .../UnsupportedOperationConnectionTest.java        |  11 +-
 .../driver/state/ok/OKDriverStateTest.java         |  10 +-
 .../transaction/ConnectionTransaction.java         | 132 +++++++++++++++++++++
 8 files changed, 222 insertions(+), 61 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
index 63b564f..e417311 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.infra.config.RuleConfiguration;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 
 import java.util.Collection;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -46,4 +47,16 @@ public final class ShardingSphereRuleMetaData {
     public <T extends ShardingSphereRule> Collection<T> findRules(final 
Class<T> clazz) {
         return rules.stream().filter(each -> 
clazz.isAssignableFrom(each.getClass())).map(clazz::cast).collect(Collectors.toList());
     }
+    
+    /**
+     * Find single rule by class.
+     *
+     * @param clazz target class
+     * @param <T> type of rule
+     * @return found single rule
+     */
+    public <T extends ShardingSphereRule> Optional<T> findSingleRule(final 
Class<T> clazz) {
+        Collection<T> foundRules = findRules(clazz);
+        return foundRules.isEmpty() ? Optional.empty() : 
Optional.of(foundRules.iterator().next());
+    }
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index b6f5b23..0dae9cb 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -27,11 +27,9 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMod
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.transaction.ConnectionTransaction;
 import org.apache.shardingsphere.transaction.TransactionHolder;
-import org.apache.shardingsphere.transaction.core.TransactionType;
-import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
 import org.apache.shardingsphere.transaction.rule.TransactionRule;
-import 
org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager;
 
 import javax.sql.DataSource;
 import java.sql.Array;
@@ -44,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * ShardingSphere Connection.
@@ -56,30 +55,16 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter im
     @Getter
     private final ContextManager contextManager;
     
-    private final TransactionType transactionType;
-    
-    private final ShardingSphereTransactionManager transactionManager;
+    private final ConnectionTransaction connectionTransaction;
     
     private boolean autoCommit = true;
     
     public ShardingSphereConnection(final String schemaName, final 
ContextManager contextManager) {
         this.schemaName = schemaName;
         this.contextManager = contextManager;
-        transactionType = getTransactionType(contextManager);
-        transactionManager = 
contextManager.getTransactionContexts().getEngines().get(schemaName).getTransactionManager(transactionType);
-    }
-    
-    private TransactionType getTransactionType(final ContextManager 
contextManager) {
-        if (null != TransactionTypeHolder.get()) {
-            return TransactionTypeHolder.get();
-        }
-        Collection<TransactionRule> rules = 
contextManager.getMetaDataContexts().getGlobalRuleMetaData().findRules(TransactionRule.class);
-        if (rules.isEmpty()) {
-            return TransactionType.LOCAL;
-        }
-        TransactionType result = rules.iterator().next().getDefaultType();
-        TransactionTypeHolder.set(result);
-        return result;
+        Optional<TransactionRule> transactionRule = 
contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class);
+        connectionTransaction = transactionRule.map(optional -> new 
ConnectionTransaction(schemaName, optional, 
contextManager.getTransactionContexts()))
+                .orElseGet(() -> new ConnectionTransaction(schemaName, 
contextManager.getTransactionContexts()));
     }
     
     /**
@@ -154,11 +139,8 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter im
     }
     
     private Connection createConnection(final String dataSourceName, final 
DataSource dataSource) throws SQLException {
-        return isInTransaction() ? 
transactionManager.getConnection(dataSourceName) : dataSource.getConnection();
-    }
-    
-    private boolean isInTransaction() {
-        return null != transactionManager && 
transactionManager.isInTransaction();
+        Optional<Connection> connectionInTransaction = 
connectionTransaction.getConnection(dataSourceName);
+        return connectionInTransaction.isPresent() ? 
connectionInTransaction.get() : dataSource.getConnection();
     }
     
     /**
@@ -167,7 +149,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter im
      * @return true or false
      */
     public boolean isHoldTransaction() {
-        return (TransactionType.LOCAL == transactionType && !autoCommit) || 
(TransactionType.XA == transactionType && isInTransaction());
+        return connectionTransaction.isHoldTransaction(autoCommit);
     }
     
     @SuppressWarnings("MagicConstant")
@@ -241,29 +223,37 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter im
     
     @Override
     public void setAutoCommit(final boolean autoCommit) throws SQLException {
-        if (TransactionType.LOCAL == transactionType) {
-            this.autoCommit = autoCommit;
-            recordMethodInvocation(Connection.class, "setAutoCommit", new 
Class[]{boolean.class}, new Object[]{autoCommit});
-            getForceExecuteTemplate().execute(getCachedConnections().values(), 
connection -> connection.setAutoCommit(autoCommit));
-            if (!autoCommit) {
-                TransactionHolder.setInTransaction();
-            }
-            return;
-        }
-        if (autoCommit != transactionManager.isInTransaction()) {
-            return;
-        }
-        if (autoCommit && transactionManager.isInTransaction()) {
-            transactionManager.commit();
-            return;
+        if (connectionTransaction.isLocalTransaction()) {
+            processLocalTransaction(autoCommit);
+        } else {
+            processDistributeTransaction(autoCommit);
         }
-        if (!autoCommit && !transactionManager.isInTransaction()) {
-            closeCachedConnections();
-            transactionManager.begin();
+    }
+    
+    private void processLocalTransaction(final boolean autoCommit) throws 
SQLException {
+        this.autoCommit = autoCommit;
+        recordMethodInvocation(Connection.class, "setAutoCommit", new 
Class[]{boolean.class}, new Object[]{autoCommit});
+        getForceExecuteTemplate().execute(getCachedConnections().values(), 
connection -> connection.setAutoCommit(autoCommit));
+        if (!autoCommit) {
             TransactionHolder.setInTransaction();
         }
     }
     
+    private void processDistributeTransaction(final boolean autoCommit) throws 
SQLException {
+        switch 
(connectionTransaction.getDistributedTransactionOperationType(autoCommit)) {
+            case BEGIN:
+                closeCachedConnections();
+                connectionTransaction.begin();
+                TransactionHolder.setInTransaction();
+                break;
+            case COMMIT:
+                connectionTransaction.commit();
+                break;
+            default:
+                break;
+        }
+    }
+    
     private void closeCachedConnections() throws SQLException {
         getForceExecuteTemplate().execute(getCachedConnections().values(), 
Connection::close);
         getCachedConnections().clear();
@@ -272,10 +262,10 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter im
     @Override
     public void commit() throws SQLException {
         try {
-            if (TransactionType.LOCAL == transactionType) {
+            if (connectionTransaction.isLocalTransaction()) {
                 
getForceExecuteTemplate().execute(getCachedConnections().values(), 
Connection::commit);
             } else {
-                transactionManager.commit();
+                connectionTransaction.commit();
             }
         } finally {
             TransactionHolder.clear();
@@ -285,10 +275,10 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter im
     @Override
     public void rollback() throws SQLException {
         try {
-            if (TransactionType.LOCAL == transactionType) {
+            if (connectionTransaction.isLocalTransaction()) {
                 
getForceExecuteTemplate().execute(getCachedConnections().values(), 
Connection::rollback);
             } else {
-                transactionManager.rollback();
+                connectionTransaction.rollback();
             }
         } finally {
             TransactionHolder.clear();
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
index 5c4d637..be4e57a 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
@@ -20,17 +20,18 @@ package org.apache.shardingsphere.driver.executor;
 import lombok.AccessLevel;
 import lombok.Getter;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -41,6 +42,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -70,6 +72,7 @@ public abstract class AbstractBaseExecutorTest {
         when(result.getMetaDataContexts()).thenReturn(metaDataContexts);
         when(result.getTransactionContexts()).thenReturn(transactionContexts);
         
when(result.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(mockDataSourceMap());
+        
when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
         return result;
     }
     
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
index b8822d1..951fb40 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -31,6 +32,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
@@ -40,6 +42,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public final class ConnectionAdapterTest {
     
@@ -130,7 +133,9 @@ public final class ConnectionAdapterTest {
     }
     
     private ShardingSphereConnection mockShardingSphereConnection(final 
Connection... connections) {
-        ShardingSphereConnection result = new 
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, mock(ContextManager.class, 
RETURNS_DEEP_STUBS));
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+        ShardingSphereConnection result = new 
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, contextManager);
         result.getCachedConnections().putAll("", Arrays.asList(connections));
         return result;
     }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
index fcf9030..0cdc27f 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
@@ -28,10 +28,11 @@ import 
org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.TransactionHolder;
+import 
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.core.TransactionOperationType;
-import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -42,6 +43,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
@@ -92,7 +94,8 @@ public final class ShardingSphereConnectionTest {
         
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
         
when(contextManager.getTransactionContexts()).thenReturn(transactionContexts);
         
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
-        TransactionTypeHolder.set(TransactionType.LOCAL);
+        
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+        connection = new ShardingSphereConnection(DefaultSchema.LOGIC_NAME, 
contextManager);
         connection = new ShardingSphereConnection(DefaultSchema.LOGIC_NAME, 
contextManager);
     }
     
@@ -131,7 +134,9 @@ public final class ShardingSphereConnectionTest {
         
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
         
when(contextManager.getTransactionContexts()).thenReturn(transactionContexts);
         
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
-        TransactionTypeHolder.set(TransactionType.XA);
+        
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+        TransactionRule transactionRule = new TransactionRule(new 
TransactionRuleConfiguration("XA", null));
+        
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.of(transactionRule));
         connection = new ShardingSphereConnection(connection.getSchemaName(), 
contextManager);
         connection.setAutoCommit(false);
         
assertTrue(XAShardingSphereTransactionManagerFixture.getInvocations().contains(TransactionOperationType.BEGIN));
@@ -147,7 +152,9 @@ public final class ShardingSphereConnectionTest {
         
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
         
when(contextManager.getTransactionContexts()).thenReturn(transactionContexts);
         
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
-        TransactionTypeHolder.set(TransactionType.BASE);
+        
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+        TransactionRule transactionRule = new TransactionRule(new 
TransactionRuleConfiguration("BASE", null));
+        
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.of(transactionRule));
         connection = new ShardingSphereConnection(connection.getSchemaName(), 
contextManager);
         connection.setAutoCommit(false);
         
assertTrue(BASEShardingSphereTransactionManagerFixture.getInvocations().contains(TransactionOperationType.BEGIN));
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
index 54bf1cf..96fad7d 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
@@ -20,18 +20,27 @@ package org.apache.shardingsphere.driver.jdbc.unsupported;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
 import org.junit.Test;
 
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public final class UnsupportedOperationConnectionTest {
     
-    private final ShardingSphereConnection shardingSphereConnection = new 
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, mock(ContextManager.class, 
RETURNS_DEEP_STUBS));
+    private final ShardingSphereConnection shardingSphereConnection;
+    
+    public UnsupportedOperationConnectionTest() {
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+        shardingSphereConnection = new 
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, contextManager);
+    }
     
     @Test(expected = SQLFeatureNotSupportedException.class)
     public void assertPrepareCall() throws SQLException {
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
index 765fbdc..2b4e030 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
@@ -20,23 +20,25 @@ package org.apache.shardingsphere.driver.state.ok;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.transaction.core.TransactionType;
-import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
 import org.junit.Test;
 
 import java.sql.Connection;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public final class OKDriverStateTest {
     
     @Test
     public void assertGetConnection() {
-        TransactionTypeHolder.set(TransactionType.LOCAL);
-        Connection actual = new 
OKDriverState().getConnection(DefaultSchema.LOGIC_NAME, 
mock(ContextManager.class, RETURNS_DEEP_STUBS));
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+        Connection actual = new 
OKDriverState().getConnection(DefaultSchema.LOGIC_NAME, contextManager);
         assertThat(actual, instanceOf(ShardingSphereConnection.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
 
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
new file mode 100644
index 0000000..6f51519
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.transaction;
+
+import org.apache.shardingsphere.transaction.context.TransactionContexts;
+import org.apache.shardingsphere.transaction.core.TransactionType;
+import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
+import 
org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * Connection transaction.
+ */
+public final class ConnectionTransaction {
+    
+    private final TransactionType transactionType;
+    
+    private final ShardingSphereTransactionManager transactionManager;
+    
+    public ConnectionTransaction(final String schemaName, final 
TransactionContexts transactionContexts) {
+        this(schemaName, TransactionType.LOCAL, transactionContexts);
+    }
+    
+    public ConnectionTransaction(final String schemaName, final 
TransactionRule rule, final TransactionContexts transactionContexts) {
+        this(schemaName, rule.getDefaultType(), transactionContexts);
+    }
+    
+    private ConnectionTransaction(final String schemaName, final 
TransactionType transactionType, final TransactionContexts transactionContexts) 
{
+        this.transactionType = transactionType;
+        transactionManager = 
transactionContexts.getEngines().get(schemaName).getTransactionManager(transactionType);
+        TransactionTypeHolder.set(transactionType);
+    }
+    
+    /**
+     * Whether in transaction.
+     * 
+     * @return in transaction or not
+     */
+    public boolean isInTransaction() {
+        return null != transactionManager && 
transactionManager.isInTransaction();
+    }
+    
+    /**
+     * Judge is local transaction or not.
+     * 
+     * @return is local transaction or not
+     */
+    public boolean isLocalTransaction() {
+        return TransactionType.LOCAL == transactionType;
+    }
+    
+    /**
+     * Whether hold transaction.
+     *
+     * @param autoCommit is auto commit
+     * @return hold transaction or not
+     */
+    public boolean isHoldTransaction(final boolean autoCommit) {
+        return (TransactionType.LOCAL == transactionType && !autoCommit) || 
(TransactionType.XA == transactionType && isInTransaction());
+    }
+    
+    /**
+     * Get connection in transaction.
+     * 
+     * @param dataSourceName data source name
+     * @return connection in transaction
+     * @throws SQLException SQL exception
+     */
+    public Optional<Connection> getConnection(final String dataSourceName) 
throws SQLException {
+        return isInTransaction() ? 
Optional.of(transactionManager.getConnection(dataSourceName)) : 
Optional.empty();
+    }
+    
+    /**
+     * Begin transaction.
+     */
+    public void begin() {
+        transactionManager.begin();
+    }
+    
+    /**
+     * Commit transaction.
+     */
+    public void commit() {
+        transactionManager.commit();
+    }
+    
+    /**
+     * Rollback transaction.
+     */
+    public void rollback() {
+        transactionManager.rollback();
+    }
+    
+    /**
+     * Get distributed transaction operation type.
+     * 
+     * @param autoCommit is auto commit
+     * @return distributed transaction operation type
+     */
+    public DistributedTransactionOperationType 
getDistributedTransactionOperationType(final boolean autoCommit) {
+        if (!autoCommit && !transactionManager.isInTransaction()) {
+            return DistributedTransactionOperationType.BEGIN;
+        }
+        if (autoCommit && transactionManager.isInTransaction()) {
+            return DistributedTransactionOperationType.COMMIT;
+        }
+        return DistributedTransactionOperationType.IGNORE;
+    }
+    
+    public enum DistributedTransactionOperationType {
+        BEGIN, COMMIT, IGNORE
+    }
+}

Reply via email to