This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 74d7d6370f2 fix autocommmit (#18161)
74d7d6370f2 is described below
commit 74d7d6370f2a473ed40938348d66a1411da44093
Author: JingShang Lu <[email protected]>
AuthorDate: Mon Jun 6 16:59:04 2022 +0800
fix autocommmit (#18161)
* fix autocommit for multiStatements
* fix
* fix
* fix
* fix
* fix
* fix
* fix
* fix
* fix
* fix
---
.../proxy/backend/communication/BackendConnection.java | 8 ++++++++
.../jdbc/connection/JDBCBackendConnection.java | 15 ++++++++++-----
.../communication/vertx/VertxBackendConnection.java | 5 +++++
.../backend/text/TextProtocolBackendHandlerFactory.java | 2 +-
.../jdbc/connection/JDBCBackendConnectionTest.java | 4 +---
.../proxy/frontend/command/CommandExecutorTask.java | 1 +
.../query/binary/execute/MySQLComStmtExecuteExecutor.java | 4 ++++
.../query/binary/prepare/MySQLComStmtPrepareExecutor.java | 4 +++-
.../query/text/query/MySQLMultiStatementsHandler.java | 3 ++-
.../binary/prepare/MySQLComStmtPrepareExecutorTest.java | 4 +++-
.../extended/bind/OpenGaussComBatchBindExecutor.java | 1 +
.../extended/PostgreSQLBatchedStatementsExecutor.java | 1 +
.../query/extended/bind/PostgreSQLComBindExecutor.java | 1 +
13 files changed, 41 insertions(+), 12 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
index 94c18688fe1..e420808bf11 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
@@ -44,6 +44,14 @@ public interface BackendConnection<T> {
*/
T prepareForTaskExecution() throws SQLException;
+ /**
+ * Handle auto commit.
+ *
+ * @return can be Void or Future
+ * @throws SQLException SQL exception
+ */
+ T handleAutoCommit() throws SQLException;
+
/**
* Close resources used in execution.
*
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
index be802510913..569de746702 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
@@ -175,17 +175,22 @@ public final class JDBCBackendConnection implements
BackendConnection<Void>, Exe
}
@Override
- public Void prepareForTaskExecution() throws SQLException {
+ public Void prepareForTaskExecution() {
synchronized (this) {
connectionReferenceCount++;
- if (!connectionSession.isAutoCommit() &&
!connectionSession.getTransactionStatus().isInTransaction()) {
- JDBCBackendTransactionManager transactionManager = new
JDBCBackendTransactionManager(this);
- transactionManager.begin();
- }
return null;
}
}
+ @Override
+ public Void handleAutoCommit() throws SQLException {
+ if (!connectionSession.isAutoCommit() &&
!connectionSession.getTransactionStatus().isInTransaction()) {
+ JDBCBackendTransactionManager transactionManager = new
JDBCBackendTransactionManager(this);
+ transactionManager.begin();
+ }
+ return null;
+ }
+
@Override
public Void closeExecutionResources() throws BackendConnectionException {
synchronized (this) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
index bf66fc6d5d6..38a40ed3569 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
@@ -115,6 +115,11 @@ public final class VertxBackendConnection implements
BackendConnection<Future<Vo
@Override
public Future<Void> prepareForTaskExecution() {
+ return Future.succeededFuture();
+ }
+
+ @Override
+ public Future<Void> handleAutoCommit() {
if (!connectionSession.isAutoCommit() &&
!connectionSession.getTransactionStatus().isInTransaction()) {
VertxLocalTransactionManager transactionManager = new
VertxLocalTransactionManager(this);
return transactionManager.begin();
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
index dd694ca28c8..4f97aa9a78b 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
@@ -141,7 +141,7 @@ public final class TextProtocolBackendHandlerFactory {
private static void handleAutoCommit(final SQLStatement sqlStatement,
final ConnectionSession connectionSession) throws SQLException {
if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
- connectionSession.getBackendConnection().prepareForTaskExecution();
+ connectionSession.getBackendConnection().handleAutoCommit();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
index 6523e087601..fa1f6c0f3b6 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
@@ -383,10 +383,8 @@ public final class JDBCBackendConnectionTest extends
ProxyContextRestorer {
}
@Test
- public void assertPrepareForTaskExecution() throws SQLException {
+ public void assertPrepareForTaskExecution() {
backendConnection.prepareForTaskExecution();
- verify(backendConnection).closeDatabaseCommunicationEngines(true);
- verify(backendConnection).closeConnections(false);
}
@Test
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 9beb10d5cbd..beed7cab77d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -73,6 +73,7 @@ public final class CommandExecutorTask implements Runnable {
if (sqlShowEnabled) {
fillLogMDC();
}
+ connectionSession.getBackendConnection().prepareForTaskExecution();
isNeedFlush = executeCommand(context, payload);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 11ecd2bf612..f3923a42414 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -54,6 +54,7 @@ import
org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+import org.apache.shardingsphere.transaction.utils.AutoCommitUtils;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -86,6 +87,9 @@ public final class MySQLComStmtExecuteExecutor implements
QueryCommandExecutor {
Preconditions.checkState(sqlParserRule.isPresent());
SQLStatement sqlStatement = sqlParserRule.get().getSQLParserEngine(
DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabases().get(databaseName).getProtocolType())).parse(packet.getSql(),
true);
+ if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
+ connectionSession.getBackendConnection().handleAutoCommit();
+ }
SQLStatementContext<?> sqlStatementContext =
SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(),
packet.getParameters(),
sqlStatement, connectionSession.getDefaultDatabaseName());
// TODO optimize SQLStatementDatabaseHolder
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
index a131bcb5ee5..3cadd04f639 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
@@ -40,6 +40,7 @@ import
org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedSta
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
@@ -65,13 +66,14 @@ public final class MySQLComStmtPrepareExecutor implements
CommandExecutor {
}
@Override
- public Collection<DatabasePacket<?>> execute() {
+ public Collection<DatabasePacket<?>> execute() throws SQLException {
failedIfContainsMultiStatements();
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
Optional<SQLParserRule> sqlParserRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().findSingleRule(SQLParserRule.class);
Preconditions.checkState(sqlParserRule.isPresent());
SQLStatement sqlStatement = sqlParserRule.get().getSQLParserEngine(
DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabases().get(connectionSession.getDatabaseName()).getProtocolType())).parse(packet.getSql(),
true);
+ connectionSession.getBackendConnection().handleAutoCommit();
if (!MySQLComStmtPrepareChecker.isStatementAllowed(sqlStatement)) {
throw new UnsupportedPreparedStatementException();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
index 044d633d224..660df55c3fb 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
@@ -90,7 +90,8 @@ public final class MySQLMultiStatementsHandler implements
TextProtocolBackendHan
private ExecutionContext anyExecutionContext;
- public MySQLMultiStatementsHandler(final ConnectionSession
connectionSession, final SQLStatement sqlStatementSample, final String sql) {
+ public MySQLMultiStatementsHandler(final ConnectionSession
connectionSession, final SQLStatement sqlStatementSample, final String sql)
throws SQLException {
+ connectionSession.getBackendConnection().handleAutoCommit();
this.connectionSession = connectionSession;
this.sqlStatementSample = sqlStatementSample;
Pattern pattern = sqlStatementSample instanceof UpdateStatement ?
MULTI_UPDATE_STATEMENTS : MULTI_DELETE_STATEMENTS;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
index 764368445e0..10f4ca924d2 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
@@ -28,6 +28,8 @@ import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.sql.SQLException;
+
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -40,7 +42,7 @@ public final class MySQLComStmtPrepareExecutorTest {
private ConnectionSession connectionSession;
@Test(expected = UnsupportedPreparedStatementException.class)
- public void assertPrepareMultiStatements() {
+ public void assertPrepareMultiStatements() throws SQLException {
when(packet.getSql()).thenReturn("update t set v=v+1 where id=1;update
t set v=v+1 where id=2;update t set v=v+1 where id=3");
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
when(connectionSession.getAttributeMap().hasAttr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS)).thenReturn(true);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutor.java
index 5a02ef0c842..56f22b0e041 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutor.java
@@ -46,6 +46,7 @@ public final class OpenGaussComBatchBindExecutor implements
CommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
+ connectionSession.getBackendConnection().handleAutoCommit();
PostgreSQLPreparedStatement preparedStatement =
PostgreSQLPreparedStatementRegistry.getInstance().get(connectionSession.getConnectionId(),
packet.getStatementId());
int updateCount = new
PostgreSQLBatchedStatementsExecutor(connectionSession, preparedStatement,
packet.readParameterSets(preparedStatement.getParameterTypes())).executeBatch();
return Arrays.asList(PostgreSQLBindCompletePacket.getInstance(),
createCommandComplete(preparedStatement.getSqlStatement(), updateCount));
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
index 8c13ca1d0b9..4242d13a40d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
@@ -137,6 +137,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
* @throws SQLException SQL exception
*/
public int executeBatch() throws SQLException {
+ connectionSession.getBackendConnection().handleAutoCommit();
addBatchedParametersToPreparedStatements();
return executeBatchedPreparedStatements();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/bind/PostgreSQLComBindExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/bind/PostgreSQLComBindExecutor.java
index 2ade63af4ea..b3e9cd88998 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/bind/PostgreSQLComBindExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/bind/PostgreSQLComBindExecutor.java
@@ -47,6 +47,7 @@ public final class PostgreSQLComBindExecutor implements
CommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
+ connectionSession.getBackendConnection().handleAutoCommit();
PostgreSQLPreparedStatement preparedStatement =
PostgreSQLPreparedStatementRegistry.getInstance().get(connectionSession.getConnectionId(),
packet.getStatementId());
JDBCBackendConnection backendConnection = (JDBCBackendConnection)
connectionSession.getBackendConnection();
JDBCPortal portal = new JDBCPortal(packet.getPortal(),
preparedStatement,
packet.readParameters(preparedStatement.getParameterTypes()),
packet.readResultFormats(), backendConnection);