This is an automated email from the ASF dual-hosted git repository.
qiulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 68f1615 if in transaction , read write router one ds (#9783)
68f1615 is described below
commit 68f1615dbcad190314f9423b8cde5ce45ce3f7c1
Author: xiaoyu <[email protected]>
AuthorDate: Tue Mar 23 20:17:25 2021 +0800
if in transaction , read write router one ds (#9783)
---
.../impl/ReadWriteSplittingDataSourceRouter.java | 4 +-
.../infra/transaction/TransactionHolder.java | 55 ++++++++++++++++++++++
.../core/connection/ShardingSphereConnection.java | 29 ++++++++----
.../transaction/BackendTransactionManager.java | 4 ++
4 files changed, 82 insertions(+), 10 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-read-write-splitting/shardingsphere-read-write-splitting-route/src/main/java/org/apache/shardingsphere/readwrite/splitting/route/engine/impl/ReadWriteSplittingDataSourceRouter.java
b/shardingsphere-features/shardingsphere-read-write-splitting/shardingsphere-read-write-splitting-route/src/main/java/org/apache/shardingsphere/readwrite/splitting/route/engine/impl/ReadWriteSplittingDataSourceRouter.java
index 72c0817..54188a3 100644
---
a/shardingsphere-features/shardingsphere-read-write-splitting/shardingsphere-read-write-splitting-route/src/main/java/org/apache/shardingsphere/readwrite/splitting/route/engine/impl/ReadWriteSplittingDataSourceRouter.java
+++
b/shardingsphere-features/shardingsphere-read-write-splitting/shardingsphere-read-write-splitting-route/src/main/java/org/apache/shardingsphere/readwrite/splitting/route/engine/impl/ReadWriteSplittingDataSourceRouter.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.aware.DataSourceNameAware;
import org.apache.shardingsphere.infra.aware.DataSourceNameAwareFactory;
import org.apache.shardingsphere.infra.hint.HintManager;
+import org.apache.shardingsphere.infra.transaction.TransactionHolder;
import
org.apache.shardingsphere.readwrite.splitting.common.rule.ReadWriteSplittingDataSourceRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
@@ -70,7 +71,8 @@ public final class ReadWriteSplittingDataSourceRouter {
}
private boolean isPrimaryRoute(final SQLStatement sqlStatement) {
- return containsLockSegment(sqlStatement) || !(sqlStatement instanceof
SelectStatement) || PrimaryVisitedManager.getPrimaryVisited() ||
HintManager.isWriteRouteOnly();
+ return containsLockSegment(sqlStatement) || !(sqlStatement instanceof
SelectStatement)
+ || PrimaryVisitedManager.getPrimaryVisited() ||
HintManager.isWriteRouteOnly() || TransactionHolder.isTransaction();
}
private boolean containsLockSegment(final SQLStatement sqlStatement) {
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/transaction/TransactionHolder.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/transaction/TransactionHolder.java
new file mode 100644
index 0000000..d8f77f8
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/transaction/TransactionHolder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.transaction;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+/**
+ * Transaction holder.
+ *
+ * <p>is transaction or not in current thread.</p>
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class TransactionHolder {
+
+ private static final ThreadLocal<Boolean> TRANSACTION =
ThreadLocal.withInitial(() -> false);
+
+ /**
+ * Judge is transaction in current thread.
+ *
+ * @return is transaction in current thread.
+ */
+ public static boolean isTransaction() {
+ return TRANSACTION.get();
+ }
+
+ /**
+ * Set transaction in current thread.
+ */
+ public static void setInTransaction() {
+ TRANSACTION.set(true);
+ }
+
+ /**
+ * Clear transaction.
+ */
+ public static void clear() {
+ TRANSACTION.remove();
+ }
+}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index dd19535..031e06f 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.driver.jdbc.core.connection;
import com.google.common.base.Preconditions;
-import java.sql.Array;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
@@ -29,11 +28,13 @@ import
org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.transaction.TransactionHolder;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;
import javax.sql.DataSource;
+import java.sql.Array;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
@@ -233,6 +234,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
this.autoCommit = autoCommit;
recordMethodInvocation(Connection.class, "setAutoCommit", new
Class[]{boolean.class}, new Object[]{autoCommit});
getForceExecuteTemplate().execute(getCachedConnections().values(),
connection -> connection.setAutoCommit(autoCommit));
+ TransactionHolder.setInTransaction();
return;
}
if (autoCommit != shardingTransactionManager.isInTransaction()) {
@@ -245,6 +247,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
if (!autoCommit && !shardingTransactionManager.isInTransaction()) {
closeCachedConnections();
shardingTransactionManager.begin();
+ TransactionHolder.setInTransaction();
}
}
@@ -255,19 +258,27 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter im
@Override
public void commit() throws SQLException {
- if (TransactionType.LOCAL == transactionType) {
- getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::commit);
- } else {
- shardingTransactionManager.commit();
+ try {
+ if (TransactionType.LOCAL == transactionType) {
+
getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::commit);
+ } else {
+ shardingTransactionManager.commit();
+ }
+ } finally {
+ TransactionHolder.clear();
}
}
@Override
public void rollback() throws SQLException {
- if (TransactionType.LOCAL == transactionType) {
- getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::rollback);
- } else {
- shardingTransactionManager.rollback();
+ try {
+ if (TransactionType.LOCAL == transactionType) {
+
getForceExecuteTemplate().execute(getCachedConnections().values(),
Connection::rollback);
+ } else {
+ shardingTransactionManager.rollback();
+ }
+ } finally {
+ TransactionHolder.clear();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
index 64db95d..f78e966 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine;
+import org.apache.shardingsphere.infra.transaction.TransactionHolder;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;
@@ -50,6 +51,7 @@ public final class BackendTransactionManager implements
TransactionManager {
public void begin() {
if (!connection.getTransactionStatus().isInTransaction()) {
connection.getTransactionStatus().setInTransaction(true);
+ TransactionHolder.setInTransaction();
connection.closeConnections(false);
}
if (TransactionType.LOCAL == transactionType || null ==
shardingTransactionManager) {
@@ -70,6 +72,7 @@ public final class BackendTransactionManager implements
TransactionManager {
}
} finally {
connection.getTransactionStatus().setInTransaction(false);
+ TransactionHolder.clear();
}
}
}
@@ -85,6 +88,7 @@ public final class BackendTransactionManager implements
TransactionManager {
}
} finally {
connection.getTransactionStatus().setInTransaction(false);
+ TransactionHolder.clear();
}
}
}