This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b63d0c3 Support set autocommit (#13813)
b63d0c3 is described below
commit b63d0c34a8ec788aab20f349d49d22a47795fa39
Author: JingShang Lu <[email protected]>
AuthorDate: Sun Nov 28 02:05:34 2021 +0800
Support set autocommit (#13813)
* support set autocommit
* fix
* fix
* fix
* Update CommandExecutorTask.java
* fix
* Compatible with PG transactions
* fix
* fix
* fix
---
.../jdbc/connection/BackendConnection.java | 2 +
.../transaction/BackendTransactionManager.java | 2 +-
.../transaction/TransactionAutoCommitHandler.java | 44 ++++++++++++++++++++++
.../transaction/TransactionBackendHandler.java | 7 ++++
.../TransactionBackendHandlerFactory.java | 7 +---
.../jdbc/connection/BackendConnectionTest.java | 4 +-
.../transaction/BackendTransactionManagerTest.java | 4 +-
.../TextProtocolBackendHandlerFactoryTest.java | 21 +++--------
.../frontend/command/CommandExecutorTask.java | 5 +++
.../frontend/command/CommandExecutorTaskTest.java | 12 +-----
10 files changed, 72 insertions(+), 36 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index bef514a..7cb8d8e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -93,6 +93,8 @@ public final class BackendConnection implements
ExecutorJDBCManager {
private final Map<String, StatementMemoryStrictlyFetchSizeSetter>
fetchSizeSetters;
private final AttributeMap attributeMap;
+
+ private boolean autoCommit = true;
public BackendConnection(final TransactionType initialTransactionType,
final AttributeMap attributeMap) {
transactionStatus = new TransactionStatus(initialTransactionType);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
index 2cc9430..3310444 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
@@ -48,7 +48,7 @@ public final class BackendTransactionManager implements
TransactionManager {
}
@Override
- public void begin() {
+ public void begin() throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
connection.getTransactionStatus().setInTransaction(true);
TransactionHolder.setInTransaction();
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionAutoCommitHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionAutoCommitHandler.java
new file mode 100644
index 0000000..efe4623
--- /dev/null
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionAutoCommitHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.text.transaction;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SetAutoCommitStatement;
+
+import java.sql.SQLException;
+
+/**
+ * Set autocommit handler.
+ */
+@RequiredArgsConstructor
+public final class TransactionAutoCommitHandler implements
TextProtocolBackendHandler {
+
+ private final SetAutoCommitStatement sqlStatement;
+
+ private final BackendConnection backendConnection;
+
+ @Override
+ public ResponseHeader execute() throws SQLException {
+ backendConnection.setAutoCommit(sqlStatement.isAutoCommit());
+ return new UpdateResponseHeader(sqlStatement);
+ }
+}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandler.java
index 2507117..be944a2 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandler.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.ReleaseSave
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackToSavepointStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SavepointStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.tcl.MySQLBeginTransactionStatement;
import org.apache.shardingsphere.transaction.core.TransactionOperationType;
import java.sql.SQLException;
@@ -41,10 +42,13 @@ public final class TransactionBackendHandler implements
TextProtocolBackendHandl
private final TransactionOperationType operationType;
private final BackendTransactionManager backendTransactionManager;
+
+ private final BackendConnection backendConnection;
public TransactionBackendHandler(final TCLStatement tclStatement, final
TransactionOperationType operationType, final BackendConnection
backendConnection) {
this.tclStatement = tclStatement;
this.operationType = operationType;
+ this.backendConnection = backendConnection;
backendTransactionManager = new
BackendTransactionManager(backendConnection);
}
@@ -52,6 +56,9 @@ public final class TransactionBackendHandler implements
TextProtocolBackendHandl
public ResponseHeader execute() throws SQLException {
switch (operationType) {
case BEGIN:
+ if (tclStatement instanceof MySQLBeginTransactionStatement &&
backendConnection.getTransactionStatus().isInTransaction()) {
+ backendTransactionManager.commit();
+ }
backendTransactionManager.begin();
break;
case SAVEPOINT:
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
index c2f6980..a2d63c2 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.data.impl.BroadcastDatabaseBackendHandler;
-import org.apache.shardingsphere.proxy.backend.text.skip.SkipBackendHandler;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.BeginTransactionStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.ReleaseSavepointStatement;
@@ -54,11 +53,7 @@ public final class TransactionBackendHandlerFactory {
return new TransactionBackendHandler(tclStatement,
TransactionOperationType.BEGIN, backendConnection);
}
if (tclStatement instanceof SetAutoCommitStatement) {
- if (((SetAutoCommitStatement) tclStatement).isAutoCommit()) {
- return
backendConnection.getTransactionStatus().isInTransaction()
- ? new TransactionBackendHandler(tclStatement,
TransactionOperationType.COMMIT, backendConnection) : new
SkipBackendHandler(tclStatement);
- }
- return new TransactionBackendHandler(tclStatement,
TransactionOperationType.BEGIN, backendConnection);
+ return new TransactionAutoCommitHandler((SetAutoCommitStatement)
tclStatement, backendConnection);
}
if (tclStatement instanceof SavepointStatement) {
return new TransactionBackendHandler(tclStatement,
TransactionOperationType.SAVEPOINT, backendConnection);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index 9bef4db..b3742a2 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -213,14 +213,14 @@ public final class BackendConnectionTest {
}
@Test(expected = ShardingSphereException.class)
- public void assertFailedSwitchTransactionTypeWhileBegin() {
+ public void assertFailedSwitchTransactionTypeWhileBegin() throws
SQLException {
BackendTransactionManager transactionManager = new
BackendTransactionManager(backendConnection);
transactionManager.begin();
backendConnection.getTransactionStatus().setTransactionType(TransactionType.XA);
}
@Test(expected = ShardingSphereException.class)
- public void assertFailedSwitchSchemaWhileBegin() {
+ public void assertFailedSwitchSchemaWhileBegin() throws SQLException {
BackendTransactionManager transactionManager = new
BackendTransactionManager(backendConnection);
transactionManager.begin();
backendConnection.setCurrentSchema("newSchema");
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManagerTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManagerTest.java
index 056a3af..539e083 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManagerTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManagerTest.java
@@ -84,7 +84,7 @@ public final class BackendTransactionManagerTest {
}
@Test
- public void assertBeginForLocalTransaction() {
+ public void assertBeginForLocalTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, false);
backendTransactionManager.begin();
verify(transactionStatus).setInTransaction(true);
@@ -94,7 +94,7 @@ public final class BackendTransactionManagerTest {
}
@Test
- public void assertBeginForDistributedTransaction() {
+ public void assertBeginForDistributedTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.XA, true);
backendTransactionManager.begin();
verify(transactionStatus, times(0)).setInTransaction(true);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
index 1cc027a..3c27904 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
@@ -35,6 +35,7 @@ import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.HintDistS
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.SetDistSQLBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.ShowDistSQLBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.skip.SkipBackendHandler;
+import
org.apache.shardingsphere.proxy.backend.text.transaction.TransactionAutoCommitHandler;
import
org.apache.shardingsphere.proxy.backend.text.transaction.TransactionBackendHandler;
import org.apache.shardingsphere.sql.parser.exception.SQLParsingException;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
@@ -135,38 +136,28 @@ public final class TextProtocolBackendHandlerFactoryTest {
public void assertNewInstanceWithSetAutoCommitToOff() throws SQLException {
String sql = "SET AUTOCOMMIT=0";
TextProtocolBackendHandler actual =
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql,
backendConnection);
- assertThat(actual, instanceOf(TransactionBackendHandler.class));
+ assertThat(actual, instanceOf(TransactionAutoCommitHandler.class));
}
@Test
public void assertNewInstanceWithScopeSetAutoCommitToOff() throws
SQLException {
String sql = "SET @@SESSION.AUTOCOMMIT = OFF";
TextProtocolBackendHandler actual =
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql,
backendConnection);
- assertThat(actual, instanceOf(TransactionBackendHandler.class));
+ assertThat(actual, instanceOf(TransactionAutoCommitHandler.class));
}
@Test
- public void assertNewInstanceWithSetAutoCommitToOnForInTransaction()
throws SQLException {
-
when(backendConnection.getTransactionStatus().isInTransaction()).thenReturn(true);
+ public void assertNewInstanceWithSetAutoCommitToOn() throws SQLException {
String sql = "SET AUTOCOMMIT=1";
TextProtocolBackendHandler actual =
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql,
backendConnection);
- assertThat(actual, instanceOf(TransactionBackendHandler.class));
+ assertThat(actual, instanceOf(TransactionAutoCommitHandler.class));
}
@Test
public void assertNewInstanceWithScopeSetAutoCommitToOnForInTransaction()
throws SQLException {
-
when(backendConnection.getTransactionStatus().isInTransaction()).thenReturn(true);
String sql = "SET @@SESSION.AUTOCOMMIT = ON";
TextProtocolBackendHandler actual =
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql,
backendConnection);
- assertThat(actual, instanceOf(TransactionBackendHandler.class));
- }
-
- @Test
- public void assertNewInstanceWithSetAutoCommitToOnForNotInTransaction()
throws SQLException {
- String sql = "SET AUTOCOMMIT=1";
-
when(backendConnection.getTransactionStatus().isInTransaction()).thenReturn(false);
- TextProtocolBackendHandler actual =
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql,
backendConnection);
- assertThat(actual, instanceOf(SkipBackendHandler.class));
+ assertThat(actual, instanceOf(TransactionAutoCommitHandler.class));
}
@Test
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 8ddb323..660aad9 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import
org.apache.shardingsphere.proxy.backend.communication.SQLStatementSchemaHolder;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStatus;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.BackendTransactionManager;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.exception.ExpectedExceptions;
@@ -69,6 +70,10 @@ public final class CommandExecutorTask implements Runnable {
connectionStatus.waitUntilConnectionRelease();
connectionStatus.switchToUsing();
}
+ if (!backendConnection.isAutoCommit() &&
!backendConnection.getTransactionStatus().isInTransaction()) {
+ BackendTransactionManager transactionManager = new
BackendTransactionManager(backendConnection);
+ transactionManager.begin();
+ }
isNeedFlush = executeCommand(context, payload, backendConnection);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 053c6f8..b5029d1 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -93,6 +93,7 @@ public final class CommandExecutorTaskTest {
@Before
public void setup() {
when(backendConnection.closeDatabaseCommunicationEngines(anyBoolean())).thenReturn(Collections.emptyList());
+ when(backendConnection.isAutoCommit()).thenReturn(true);
when(handlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
}
@@ -108,9 +109,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
- verify(connectionStatus).waitUntilConnectionRelease();
- verify(connectionStatus).switchToUsing();
- verify(connectionStatus).switchToReleased();
verify(queryCommandExecutor).close();
verify(backendConnection).closeDatabaseCommunicationEngines(true);
}
@@ -128,9 +126,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
- verify(connectionStatus).waitUntilConnectionRelease();
- verify(connectionStatus).switchToUsing();
- verify(connectionStatus).switchToReleased();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
verify(engine.getCommandExecuteEngine()).writeQueryData(handlerContext,
backendConnection, queryCommandExecutor, 1);
@@ -152,9 +147,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
- verify(connectionStatus).waitUntilConnectionRelease();
- verify(connectionStatus).switchToUsing();
- verify(connectionStatus).switchToReleased();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
verify(commandExecutor).close();
@@ -165,7 +157,7 @@ public final class CommandExecutorTaskTest {
public void assertRunWithError() {
RuntimeException mockException = new RuntimeException("mock");
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
- doThrow(mockException).when(connectionStatus).switchToUsing();
+ doThrow(mockException).when(backendConnection).isAutoCommit();
when(engine.getCodecEngine().createPacketPayload(message,
StandardCharsets.UTF_8)).thenReturn(payload);
when(engine.getCommandExecuteEngine().getErrorPacket(mockException,
backendConnection)).thenReturn(databasePacket);
when(engine.getCommandExecuteEngine().getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));