This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1d0e278 Add ConnectionTransaction (#12888)
1d0e278 is described below
commit 1d0e2789f42bb81d46bf4486cc81db82a3d90246
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 2 18:32:04 2021 +0800
Add ConnectionTransaction (#12888)
* Add ContextManager.getDataSourceMap()
* Fix test cases
* Add ConnectionTransaction
* Add ConnectionTransaction
---
.../metadata/rule/ShardingSphereRuleMetaData.java | 13 ++
.../core/connection/ShardingSphereConnection.java | 88 ++++++--------
.../driver/executor/AbstractBaseExecutorTest.java | 7 +-
.../driver/jdbc/adapter/ConnectionAdapterTest.java | 7 +-
.../connection/ShardingSphereConnectionTest.java | 15 ++-
.../UnsupportedOperationConnectionTest.java | 11 +-
.../driver/state/ok/OKDriverStateTest.java | 10 +-
.../transaction/ConnectionTransaction.java | 132 +++++++++++++++++++++
8 files changed, 222 insertions(+), 61 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
index 63b564f..e417311 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/rule/ShardingSphereRuleMetaData.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import java.util.Collection;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -46,4 +47,16 @@ public final class ShardingSphereRuleMetaData {
public <T extends ShardingSphereRule> Collection<T> findRules(final
Class<T> clazz) {
return rules.stream().filter(each ->
clazz.isAssignableFrom(each.getClass())).map(clazz::cast).collect(Collectors.toList());
}
+
+ /**
+ * Find single rule by class.
+ *
+ * @param clazz target class
+ * @param <T> type of rule
+ * @return found single rule
+ */
+ public <T extends ShardingSphereRule> Optional<T> findSingleRule(final
Class<T> clazz) {
+ Collection<T> foundRules = findRules(clazz);
+ return foundRules.isEmpty() ? Optional.empty() :
Optional.of(foundRules.iterator().next());
+ }
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index b6f5b23..0dae9cb 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -27,11 +27,9 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMod
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.transaction.ConnectionTransaction;
import org.apache.shardingsphere.transaction.TransactionHolder;
-import org.apache.shardingsphere.transaction.core.TransactionType;
-import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
-import
org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager;
import javax.sql.DataSource;
import java.sql.Array;
@@ -44,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
/**
* ShardingSphere Connection.
@@ -56,30 +55,16 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
@Getter
private final ContextManager contextManager;
- private final TransactionType transactionType;
-
- private final ShardingSphereTransactionManager transactionManager;
+ private final ConnectionTransaction connectionTransaction;
private boolean autoCommit = true;
public ShardingSphereConnection(final String schemaName, final
ContextManager contextManager) {
this.schemaName = schemaName;
this.contextManager = contextManager;
- transactionType = getTransactionType(contextManager);
- transactionManager =
contextManager.getTransactionContexts().getEngines().get(schemaName).getTransactionManager(transactionType);
- }
-
- private TransactionType getTransactionType(final ContextManager
contextManager) {
- if (null != TransactionTypeHolder.get()) {
- return TransactionTypeHolder.get();
- }
- Collection<TransactionRule> rules =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().findRules(TransactionRule.class);
- if (rules.isEmpty()) {
- return TransactionType.LOCAL;
- }
- TransactionType result = rules.iterator().next().getDefaultType();
- TransactionTypeHolder.set(result);
- return result;
+ Optional<TransactionRule> transactionRule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class);
+ connectionTransaction = transactionRule.map(optional -> new
ConnectionTransaction(schemaName, optional,
contextManager.getTransactionContexts()))
+ .orElseGet(() -> new ConnectionTransaction(schemaName,
contextManager.getTransactionContexts()));
}
/**
@@ -154,11 +139,8 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
}
private Connection createConnection(final String dataSourceName, final
DataSource dataSource) throws SQLException {
- return isInTransaction() ?
transactionManager.getConnection(dataSourceName) : dataSource.getConnection();
- }
-
- private boolean isInTransaction() {
- return null != transactionManager &&
transactionManager.isInTransaction();
+ Optional<Connection> connectionInTransaction =
connectionTransaction.getConnection(dataSourceName);
+ return connectionInTransaction.isPresent() ?
connectionInTransaction.get() : dataSource.getConnection();
}
/**
@@ -167,7 +149,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
* @return true or false
*/
public boolean isHoldTransaction() {
- return (TransactionType.LOCAL == transactionType && !autoCommit) ||
(TransactionType.XA == transactionType && isInTransaction());
+ return connectionTransaction.isHoldTransaction(autoCommit);
}
@SuppressWarnings("MagicConstant")
@@ -241,29 +223,37 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
@Override
public void setAutoCommit(final boolean autoCommit) throws SQLException {
- if (TransactionType.LOCAL == transactionType) {
- this.autoCommit = autoCommit;
- recordMethodInvocation(Connection.class, "setAutoCommit", new
Class[]{boolean.class}, new Object[]{autoCommit});
- getForceExecuteTemplate().execute(getCachedConnections().values(),
connection -> connection.setAutoCommit(autoCommit));
- if (!autoCommit) {
- TransactionHolder.setInTransaction();
- }
- return;
- }
- if (autoCommit != transactionManager.isInTransaction()) {
- return;
- }
- if (autoCommit && transactionManager.isInTransaction()) {
- transactionManager.commit();
- return;
+ if (connectionTransaction.isLocalTransaction()) {
+ processLocalTransaction(autoCommit);
+ } else {
+ processDistributeTransaction(autoCommit);
}
- if (!autoCommit && !transactionManager.isInTransaction()) {
- closeCachedConnections();
- transactionManager.begin();
+ }
+
+ private void processLocalTransaction(final boolean autoCommit) throws
SQLException {
+ this.autoCommit = autoCommit;
+ recordMethodInvocation(Connection.class, "setAutoCommit", new
Class[]{boolean.class}, new Object[]{autoCommit});
+ getForceExecuteTemplate().execute(getCachedConnections().values(),
connection -> connection.setAutoCommit(autoCommit));
+ if (!autoCommit) {
TransactionHolder.setInTransaction();
}
}
+ private void processDistributeTransaction(final boolean autoCommit) throws
SQLException {
+ switch
(connectionTransaction.getDistributedTransactionOperationType(autoCommit)) {
+ case BEGIN:
+ closeCachedConnections();
+ connectionTransaction.begin();
+ TransactionHolder.setInTransaction();
+ break;
+ case COMMIT:
+ connectionTransaction.commit();
+ break;
+ default:
+ break;
+ }
+ }
+
private void closeCachedConnections() throws SQLException {
getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::close);
getCachedConnections().clear();
@@ -272,10 +262,10 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
@Override
public void commit() throws SQLException {
try {
- if (TransactionType.LOCAL == transactionType) {
+ if (connectionTransaction.isLocalTransaction()) {
getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::commit);
} else {
- transactionManager.commit();
+ connectionTransaction.commit();
}
} finally {
TransactionHolder.clear();
@@ -285,10 +275,10 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
@Override
public void rollback() throws SQLException {
try {
- if (TransactionType.LOCAL == transactionType) {
+ if (connectionTransaction.isLocalTransaction()) {
getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::rollback);
} else {
- transactionManager.rollback();
+ connectionTransaction.rollback();
}
} finally {
TransactionHolder.clear();
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
index 5c4d637..be4e57a 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
@@ -20,17 +20,18 @@ package org.apache.shardingsphere.driver.executor;
import lombok.AccessLevel;
import lombok.Getter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
@@ -41,6 +42,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -70,6 +72,7 @@ public abstract class AbstractBaseExecutorTest {
when(result.getMetaDataContexts()).thenReturn(metaDataContexts);
when(result.getTransactionContexts()).thenReturn(transactionContexts);
when(result.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(mockDataSourceMap());
+
when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
return result;
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
index b8822d1..951fb40 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/ConnectionAdapterTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.Test;
import java.lang.reflect.Field;
@@ -31,6 +32,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
@@ -40,6 +42,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public final class ConnectionAdapterTest {
@@ -130,7 +133,9 @@ public final class ConnectionAdapterTest {
}
private ShardingSphereConnection mockShardingSphereConnection(final
Connection... connections) {
- ShardingSphereConnection result = new
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, mock(ContextManager.class,
RETURNS_DEEP_STUBS));
+ ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+ ShardingSphereConnection result = new
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, contextManager);
result.getCachedConnections().putAll("", Arrays.asList(connections));
return result;
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
index fcf9030..0cdc27f 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnectionTest.java
@@ -28,10 +28,11 @@ import
org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.TransactionHolder;
+import
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.core.TransactionOperationType;
-import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -42,6 +43,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
@@ -92,7 +94,8 @@ public final class ShardingSphereConnectionTest {
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
when(contextManager.getTransactionContexts()).thenReturn(transactionContexts);
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
- TransactionTypeHolder.set(TransactionType.LOCAL);
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+ connection = new ShardingSphereConnection(DefaultSchema.LOGIC_NAME,
contextManager);
connection = new ShardingSphereConnection(DefaultSchema.LOGIC_NAME,
contextManager);
}
@@ -131,7 +134,9 @@ public final class ShardingSphereConnectionTest {
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
when(contextManager.getTransactionContexts()).thenReturn(transactionContexts);
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
- TransactionTypeHolder.set(TransactionType.XA);
+
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+ TransactionRule transactionRule = new TransactionRule(new
TransactionRuleConfiguration("XA", null));
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.of(transactionRule));
connection = new ShardingSphereConnection(connection.getSchemaName(),
contextManager);
connection.setAutoCommit(false);
assertTrue(XAShardingSphereTransactionManagerFixture.getInvocations().contains(TransactionOperationType.BEGIN));
@@ -147,7 +152,9 @@ public final class ShardingSphereConnectionTest {
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
when(contextManager.getTransactionContexts()).thenReturn(transactionContexts);
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
- TransactionTypeHolder.set(TransactionType.BASE);
+
when(contextManager.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+ TransactionRule transactionRule = new TransactionRule(new
TransactionRuleConfiguration("BASE", null));
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.of(transactionRule));
connection = new ShardingSphereConnection(connection.getSchemaName(),
contextManager);
connection.setAutoCommit(false);
assertTrue(BASEShardingSphereTransactionManagerFixture.getInvocations().contains(TransactionOperationType.BEGIN));
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
index 54bf1cf..96fad7d 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationConnectionTest.java
@@ -20,18 +20,27 @@ package org.apache.shardingsphere.driver.jdbc.unsupported;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.Test;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
+import java.util.Optional;
import java.util.Properties;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public final class UnsupportedOperationConnectionTest {
- private final ShardingSphereConnection shardingSphereConnection = new
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, mock(ContextManager.class,
RETURNS_DEEP_STUBS));
+ private final ShardingSphereConnection shardingSphereConnection;
+
+ public UnsupportedOperationConnectionTest() {
+ ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+ shardingSphereConnection = new
ShardingSphereConnection(DefaultSchema.LOGIC_NAME, contextManager);
+ }
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertPrepareCall() throws SQLException {
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
index 765fbdc..2b4e030 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/state/ok/OKDriverStateTest.java
@@ -20,23 +20,25 @@ package org.apache.shardingsphere.driver.state.ok;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.transaction.core.TransactionType;
-import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.Test;
import java.sql.Connection;
+import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public final class OKDriverStateTest {
@Test
public void assertGetConnection() {
- TransactionTypeHolder.set(TransactionType.LOCAL);
- Connection actual = new
OKDriverState().getConnection(DefaultSchema.LOGIC_NAME,
mock(ContextManager.class, RETURNS_DEEP_STUBS));
+ ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+ Connection actual = new
OKDriverState().getConnection(DefaultSchema.LOGIC_NAME, contextManager);
assertThat(actual, instanceOf(ShardingSphereConnection.class));
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
new file mode 100644
index 0000000..6f51519
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.transaction;
+
+import org.apache.shardingsphere.transaction.context.TransactionContexts;
+import org.apache.shardingsphere.transaction.core.TransactionType;
+import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
+import
org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * Connection transaction.
+ */
+public final class ConnectionTransaction {
+
+ private final TransactionType transactionType;
+
+ private final ShardingSphereTransactionManager transactionManager;
+
+ public ConnectionTransaction(final String schemaName, final
TransactionContexts transactionContexts) {
+ this(schemaName, TransactionType.LOCAL, transactionContexts);
+ }
+
+ public ConnectionTransaction(final String schemaName, final
TransactionRule rule, final TransactionContexts transactionContexts) {
+ this(schemaName, rule.getDefaultType(), transactionContexts);
+ }
+
+ private ConnectionTransaction(final String schemaName, final
TransactionType transactionType, final TransactionContexts transactionContexts)
{
+ this.transactionType = transactionType;
+ transactionManager =
transactionContexts.getEngines().get(schemaName).getTransactionManager(transactionType);
+ TransactionTypeHolder.set(transactionType);
+ }
+
+ /**
+ * Whether in transaction.
+ *
+ * @return in transaction or not
+ */
+ public boolean isInTransaction() {
+ return null != transactionManager &&
transactionManager.isInTransaction();
+ }
+
+ /**
+ * Judge is local transaction or not.
+ *
+ * @return is local transaction or not
+ */
+ public boolean isLocalTransaction() {
+ return TransactionType.LOCAL == transactionType;
+ }
+
+ /**
+ * Whether hold transaction.
+ *
+ * @param autoCommit is auto commit
+ * @return hold transaction or not
+ */
+ public boolean isHoldTransaction(final boolean autoCommit) {
+ return (TransactionType.LOCAL == transactionType && !autoCommit) ||
(TransactionType.XA == transactionType && isInTransaction());
+ }
+
+ /**
+ * Get connection in transaction.
+ *
+ * @param dataSourceName data source name
+ * @return connection in transaction
+ * @throws SQLException SQL exception
+ */
+ public Optional<Connection> getConnection(final String dataSourceName)
throws SQLException {
+ return isInTransaction() ?
Optional.of(transactionManager.getConnection(dataSourceName)) :
Optional.empty();
+ }
+
+ /**
+ * Begin transaction.
+ */
+ public void begin() {
+ transactionManager.begin();
+ }
+
+ /**
+ * Commit transaction.
+ */
+ public void commit() {
+ transactionManager.commit();
+ }
+
+ /**
+ * Rollback transaction.
+ */
+ public void rollback() {
+ transactionManager.rollback();
+ }
+
+ /**
+ * Get distributed transaction operation type.
+ *
+ * @param autoCommit is auto commit
+ * @return distributed transaction operation type
+ */
+ public DistributedTransactionOperationType
getDistributedTransactionOperationType(final boolean autoCommit) {
+ if (!autoCommit && !transactionManager.isInTransaction()) {
+ return DistributedTransactionOperationType.BEGIN;
+ }
+ if (autoCommit && transactionManager.isInTransaction()) {
+ return DistributedTransactionOperationType.COMMIT;
+ }
+ return DistributedTransactionOperationType.IGNORE;
+ }
+
+ public enum DistributedTransactionOperationType {
+ BEGIN, COMMIT, IGNORE
+ }
+}