This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch opengauss_adapt
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/opengauss_adapt by this push:
new 3d8021c Support Portal for PostgreSQL Proxy (#10942)
3d8021c is described below
commit 3d8021c5bfcf38c598dcaa91f9d02014c7b2ebcc
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Jun 23 17:05:38 2021 +0800
Support Portal for PostgreSQL Proxy (#10942)
* Add OpenGauss Database type (#10292)
* Update OpenGaussDataSourceMetaData
* add jdbc:opengauss supoort (#10601)
* Fix : remove @Override annotation since getDatabaseType() has been
removed from Interface (#10836)
* Fix : fix checkstyle violation (#10838)
* Fix implements of OpenGaussParserFacade (#10849)
* Implements openGauss batch bind protocol (#10850)
* Implements openGauss BatchBind
* Add javadoc
* Add OpenGauss Database type (#10292)
* Update OpenGaussDataSourceMetaData
* add jdbc:opengauss supoort (#10601)
* Fix : remove @Override annotation since getDatabaseType() has been
removed from Interface (#10836)
* Fix : fix checkstyle violation (#10838)
* Fix implements of OpenGaussParserFacade (#10849)
* Implements openGauss batch bind protocol (#10850)
* Implements openGauss BatchBind
* Add javadoc
* Local openGauss JDBC driver
* Support portal for PostgreSQL Proxy
* Support Close Portal for PostgreSQL
* Enhance PostgreSQLComExecuteExecutor
* Move cached statements and result sets into DatabaseCommunicationEngine
* Complete tests for DatabaseCommunicationEngine
* Close TextProtocolBackendHandler correctly
* Complete Portal implementation
* Adapt openGauss batch bind
* Revert "Local openGauss JDBC driver"
This reverts commit 5a1828fb
Co-authored-by: Liang Zhang <[email protected]>
Co-authored-by: zhangliang <[email protected]>
Co-authored-by: justbk2015 <[email protected]>
Co-authored-by: 孙念君 Sun Nianjun <[email protected]>
---
.../binary/bind/OpenGaussComBatchBindPacket.java | 28 +----
.../query/binary/bind/PostgreSQLComBindPacket.java | 4 +-
.../binary/execute/PostgreSQLComExecutePacket.java | 10 +-
...t.java => PostgreSQLPortalSuspendedPacket.java} | 16 +--
.../communication/DatabaseCommunicationEngine.java | 75 +++++++++++-
.../backend/communication/ProxySQLExecutor.java | 9 +-
.../jdbc/connection/BackendConnection.java | 50 ++------
.../jdbc/executor/ProxyJDBCExecutor.java | 7 +-
.../callback/ProxyJDBCExecutorCallback.java | 12 +-
.../callback/ProxyJDBCExecutorCallbackFactory.java | 15 +--
.../ProxyPreparedStatementExecutorCallback.java | 6 +-
.../impl/ProxyStatementExecutorCallback.java | 6 +-
.../backend/text/TextProtocolBackendHandler.java | 8 ++
.../impl/SchemaAssignedDatabaseBackendHandler.java | 5 +
.../data/impl/UnicastDatabaseBackendHandler.java | 5 +
.../DatabaseCommunicationEngineTest.java | 99 ++++++++++++++--
.../jdbc/connection/BackendConnectionTest.java | 73 ------------
.../frontend/command/CommandExecutorTask.java | 20 ++--
.../netty/FrontendChannelInboundHandler.java | 3 +-
.../frontend/command/CommandExecutorTaskTest.java | 11 +-
.../execute/MySQLComStmtExecuteExecutor.java | 7 +-
.../fieldlist/MySQLComFieldListPacketExecutor.java | 7 +-
.../text/query/MySQLComQueryPacketExecutor.java | 5 +
.../command/PostgreSQLCommandExecutorFactory.java | 7 +-
.../command/PostgreSQLConnectionContext.java | 87 +++++++++++++-
.../command/query/binary/PostgreSQLPortal.java | 129 +++++++++++++++++++++
.../binary/bind/OpenGaussComBatchBindExecutor.java | 11 +-
.../binary/bind/PostgreSQLComBindExecutor.java | 105 ++---------------
.../binary/close/PostgreSQLComCloseExecutor.java | 22 ++--
.../execute/PostgreSQLComExecuteExecutor.java | 75 ++++++------
.../query/text/PostgreSQLComQueryExecutor.java | 22 +++-
.../PostgreSQLCommandExecuteEngineTest.java | 10 --
.../binary/bind/PostgreSQLComBindExecutorTest.java | 65 -----------
.../close/PostgreSQLComCloseExecutorTest.java | 15 ++-
.../execute/PostgreSQLComExecuteExecutorTest.java | 52 +++++----
.../query/text/PostgreSQLComQueryExecutorTest.java | 6 +-
.../frontend/command/executor/CommandExecutor.java | 8 ++
37 files changed, 633 insertions(+), 462 deletions(-)
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
index 8c290e9..9b767b1 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.bi
import com.google.common.collect.Lists;
import lombok.Getter;
import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat;
+import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement;
@@ -47,11 +47,11 @@ public final class OpenGaussComBatchBindPacket extends
PostgreSQLCommandPacket {
private final String statementId;
private final String sql;
-
- private final List<Integer> resultFormatCodes;
-
+
private final List<List<Object>> parameters;
+ private final List<PostgreSQLValueFormat> resultFormats;
+
public OpenGaussComBatchBindPacket(final PostgreSQLPacketPayload payload,
final int connectionId) {
payload.readInt4();
payload.readInt4();
@@ -63,9 +63,9 @@ public final class OpenGaussComBatchBindPacket extends
PostgreSQLCommandPacket {
parameterFormats.add(payload.readInt2());
}
int resultFormatsLength = payload.readInt2();
- resultFormatCodes = new ArrayList<>(resultFormatsLength);
+ resultFormats = new ArrayList<>(resultFormatsLength);
for (int i = 0; i < resultFormatsLength; i++) {
- resultFormatCodes.add(payload.readInt2());
+
resultFormats.add(PostgreSQLValueFormat.valueOf(payload.readInt2()));
}
PostgreSQLBinaryStatement binaryStatement =
PostgreSQLBinaryStatementRegistry.getInstance().get(connectionId).getBinaryStatement(statementId);
sql = null == binaryStatement ? null : binaryStatement.getSql();
@@ -152,22 +152,6 @@ public final class OpenGaussComBatchBindPacket extends
PostgreSQLCommandPacket {
return binaryProtocolValue.read(payload, parameterValueLength);
}
- /**
- * Get result format by column index.
- *
- * @param columnIndex column index
- * @return result format
- */
- public PostgreSQLColumnFormat getResultFormatByColumnIndex(final int
columnIndex) {
- if (resultFormatCodes.isEmpty()) {
- return PostgreSQLColumnFormat.TEXT;
- }
- if (1 == resultFormatCodes.size()) {
- return PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(0));
- }
- return
PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(columnIndex));
- }
-
@Override
public void write(final PostgreSQLPacketPayload payload) {
}
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
index 1ed09d3..7955d3b 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
@@ -43,6 +43,8 @@ import java.util.List;
@Getter
public final class PostgreSQLComBindPacket extends PostgreSQLCommandPacket {
+ private final String portal;
+
private final String statementId;
private final String sql;
@@ -53,7 +55,7 @@ public final class PostgreSQLComBindPacket extends
PostgreSQLCommandPacket {
public PostgreSQLComBindPacket(final PostgreSQLPacketPayload payload,
final int connectionId) {
payload.readInt4();
- payload.readStringNul();
+ portal = payload.readStringNul();
statementId = payload.readStringNul();
int parameterFormatCount = payload.readInt2();
List<Integer> parameterFormats = new ArrayList<>(parameterFormatCount);
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
index c4469c2..f7b499a 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute;
+import lombok.Getter;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
@@ -25,12 +26,17 @@ import
org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacket
/**
* Command execute packet for PostgreSQL.
*/
+@Getter
public final class PostgreSQLComExecutePacket extends PostgreSQLCommandPacket {
+ private final String portal;
+
+ private final int maxRows;
+
public PostgreSQLComExecutePacket(final PostgreSQLPacketPayload payload) {
payload.readInt4();
- payload.readStringNul();
- payload.readInt4();
+ portal = payload.readStringNul();
+ maxRows = payload.readInt4();
}
@Override
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLPortalSuspendedPacket.java
similarity index 69%
copy from
shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
copy to
shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLPortalSuspendedPacket.java
index c4469c2..abc8647 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLPortalSuspendedPacket.java
@@ -17,21 +17,15 @@
package
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import
org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
/**
- * Command execute packet for PostgreSQL.
+ * Portal suspended packet for PostgreSQL.
*/
-public final class PostgreSQLComExecutePacket extends PostgreSQLCommandPacket {
-
- public PostgreSQLComExecutePacket(final PostgreSQLPacketPayload payload) {
- payload.readInt4();
- payload.readStringNul();
- payload.readInt4();
- }
+public final class PostgreSQLPortalSuspendedPacket implements
PostgreSQLIdentifierPacket {
@Override
public void write(final PostgreSQLPacketPayload payload) {
@@ -39,6 +33,6 @@ public final class PostgreSQLComExecutePacket extends
PostgreSQLCommandPacket {
@Override
public PostgreSQLIdentifierTag getIdentifier() {
- return PostgreSQLCommandPacketType.EXECUTE_COMMAND;
+ return PostgreSQLMessagePacketType.PORTAL_SUSPENDED;
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 2f1a058..b770f31 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -44,11 +44,15 @@ import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryH
import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
/**
@@ -73,18 +77,40 @@ public final class DatabaseCommunicationEngine {
private ProxyLockEngine proxyLockEngine;
+ private final Collection<Statement> cachedStatements = new
CopyOnWriteArrayList<>();
+
+ private final Collection<ResultSet> cachedResultSets = new
CopyOnWriteArrayList<>();
+
public DatabaseCommunicationEngine(final String driverType, final
ShardingSphereMetaData metaData, final LogicSQL logicSQL, final
BackendConnection backendConnection) {
this.driverType = driverType;
this.metaData = metaData;
this.logicSQL = logicSQL;
- proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection);
+ proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection,
this);
kernelProcessor = new KernelProcessor();
- proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new
MetadataRefreshEngine(metaData,
-
ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
+ proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new
MetadataRefreshEngine(metaData,
+
ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
ProxyContext.getInstance().getMetaDataContexts().getProps(),
ProxyContext.getInstance().getLock().orElse(null)),
backendConnection.getSchemaName());
}
/**
+ * Add statement.
+ *
+ * @param statement statement to be added
+ */
+ public void add(final Statement statement) {
+ cachedStatements.add(statement);
+ }
+
+ /**
+ * Add result set.
+ *
+ * @param resultSet result set to be added
+ */
+ public void add(final ResultSet resultSet) {
+ cachedResultSets.add(resultSet);
+ }
+
+ /**
* Execute to database.
*
* @return backend response
@@ -192,4 +218,47 @@ public final class DatabaseCommunicationEngine {
private boolean isBinary() {
return JDBCDriverType.PREPARED_STATEMENT.equals(driverType);
}
+
+ /**
+ * Close database communication engine.
+ *
+ * @throws SQLException SQL exception
+ */
+ public void close() throws SQLException {
+ Collection<SQLException> result = new LinkedList<>();
+ result.addAll(closeResultSets());
+ result.addAll(closeStatements());
+ if (result.isEmpty()) {
+ return;
+ }
+ SQLException ex = new SQLException();
+ result.forEach(ex::setNextException);
+ throw ex;
+ }
+
+ private Collection<SQLException> closeResultSets() {
+ Collection<SQLException> result = new LinkedList<>();
+ for (ResultSet each : cachedResultSets) {
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ result.add(ex);
+ }
+ }
+ cachedResultSets.clear();
+ return result;
+ }
+
+ private Collection<SQLException> closeStatements() {
+ Collection<SQLException> result = new LinkedList<>();
+ for (Statement each : cachedStatements) {
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ result.add(ex);
+ }
+ }
+ cachedStatements.clear();
+ return result;
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index fba5049..0df7432 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -67,18 +67,21 @@ public final class ProxySQLExecutor {
private final BackendConnection backendConnection;
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
+
private final ProxyJDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
private final FederateExecutor federateExecutor;
- public ProxySQLExecutor(final String type, final BackendConnection
backendConnection) {
+ public ProxySQLExecutor(final String type, final BackendConnection
backendConnection, final DatabaseCommunicationEngine
databaseCommunicationEngine) {
this.type = type;
this.backendConnection = backendConnection;
+ this.databaseCommunicationEngine = databaseCommunicationEngine;
ExecutorEngine executorEngine =
BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
- jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new
JDBCExecutor(executorEngine, isSerialExecute));
+ jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection,
databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getMetaDataContexts();
rawExecutor = new RawExecutor(executorEngine, isSerialExecute,
metaDataContexts.getProps());
// TODO Consider FederateRawExecutor
@@ -145,7 +148,7 @@ public final class ProxySQLExecutor {
}
MetaDataContexts metaData =
ProxyContext.getInstance().getMetaDataContexts();
ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(type,
metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
- executionContext.getSqlStatementContext().getSqlStatement(),
backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
+ executionContext.getSqlStatementContext().getSqlStatement(),
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true);
backendConnection.setFederateExecutor(federateExecutor);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaData);
return federateExecutor.executeQuery(executionContext, callback,
prepareEngine).stream().map(each -> (ExecuteResult)
each).collect(Collectors.toList());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 1bcffb8..0beea56 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -39,7 +40,6 @@ import
org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
@@ -74,9 +74,7 @@ public final class BackendConnection implements
ExecutorJDBCManager {
private final Multimap<String, Connection> cachedConnections =
LinkedHashMultimap.create();
- private final Collection<Statement> cachedStatements = new
CopyOnWriteArrayList<>();
-
- private final Collection<ResultSet> cachedResultSets = new
CopyOnWriteArrayList<>();
+ private final Collection<DatabaseCommunicationEngine>
cachedDatabaseCommunicationEngines = new CopyOnWriteArrayList<>();
private final Collection<ConnectionPostProcessor> connectionPostProcessors
= new LinkedList<>();
@@ -213,56 +211,28 @@ public final class BackendConnection implements
ExecutorJDBCManager {
}
/**
- * Add statement.
- *
- * @param statement statement to be added
- */
- public void add(final Statement statement) {
- cachedStatements.add(statement);
- }
-
- /**
- * Add result set.
+ * Add database communication engine.
*
- * @param resultSet result set to be added
+ * @param databaseCommunicationEngine database communication engine to be
added
*/
- public void add(final ResultSet resultSet) {
- cachedResultSets.add(resultSet);
- }
-
- /**
- * Close result sets.
- *
- * @return SQL exception when result sets close
- */
- public synchronized Collection<SQLException> closeResultSets() {
- Collection<SQLException> result = new LinkedList<>();
- for (ResultSet each : cachedResultSets) {
- try {
- each.close();
- } catch (final SQLException ex) {
- result.add(ex);
- }
- }
- cachedResultSets.clear();
- return result;
+ public void add(final DatabaseCommunicationEngine
databaseCommunicationEngine) {
+ cachedDatabaseCommunicationEngines.add(databaseCommunicationEngine);
}
/**
- * Close statements.
+ * Close database communication engines.
*
- * @return SQL exception when statements close
+ * @return SQL exception when engine close
*/
- public synchronized Collection<SQLException> closeStatements() {
+ public synchronized Collection<SQLException>
closeDatabaseCommunicationEngines() {
Collection<SQLException> result = new LinkedList<>();
- for (Statement each : cachedStatements) {
+ for (DatabaseCommunicationEngine each :
cachedDatabaseCommunicationEngines) {
try {
each.close();
} catch (final SQLException ex) {
result.add(ex);
}
}
- cachedStatements.clear();
return result;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 4087d10..d6e12c6 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -44,6 +45,8 @@ public final class ProxyJDBCExecutor {
private final BackendConnection backendConnection;
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
+
@Getter
private final JDBCExecutor jdbcExecutor;
@@ -64,8 +67,8 @@ public final class ProxyJDBCExecutor {
DatabaseType databaseType =
metaDataContexts.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType();
ExecuteProcessEngine.initialize(context, executionGroupContext,
metaDataContexts.getProps());
Collection<ExecuteResult> result =
jdbcExecutor.execute(executionGroupContext,
- ProxyJDBCExecutorCallbackFactory.newInstance(type,
databaseType, context.getSqlStatement(), backendConnection,
isReturnGeneratedKeys, isExceptionThrown, true),
- ProxyJDBCExecutorCallbackFactory.newInstance(type,
databaseType, context.getSqlStatement(), backendConnection,
isReturnGeneratedKeys, isExceptionThrown, false));
+ ProxyJDBCExecutorCallbackFactory.newInstance(type,
databaseType, context.getSqlStatement(), databaseCommunicationEngine,
isReturnGeneratedKeys, isExceptionThrown, true),
+ ProxyJDBCExecutorCallbackFactory.newInstance(type,
databaseType, context.getSqlStatement(), databaseCommunicationEngine,
isReturnGeneratedKeys, isExceptionThrown, false));
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
return result;
} finally {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
index 2d05d4f..3285833 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -43,7 +43,7 @@ import java.util.Optional;
*/
public abstract class ProxyJDBCExecutorCallback extends
JDBCExecutorCallback<ExecuteResult> {
- private final BackendConnection backendConnection;
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
private final boolean isReturnGeneratedKeys;
@@ -51,10 +51,10 @@ public abstract class ProxyJDBCExecutorCallback extends
JDBCExecutorCallback<Exe
private boolean hasMetaData;
- public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final
SQLStatement sqlStatement, final BackendConnection backendConnection,
+ public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final
SQLStatement sqlStatement, final DatabaseCommunicationEngine
databaseCommunicationEngine,
final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown, final boolean fetchMetaData) {
super(databaseType, sqlStatement, isExceptionThrown);
- this.backendConnection = backendConnection;
+ this.databaseCommunicationEngine = databaseCommunicationEngine;
this.isReturnGeneratedKeys = isReturnGeneratedKeys;
this.fetchMetaData = fetchMetaData;
}
@@ -69,10 +69,10 @@ public abstract class ProxyJDBCExecutorCallback extends
JDBCExecutorCallback<Exe
}
private ExecuteResult executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final boolean withMetadata)
throws SQLException {
- backendConnection.add(statement);
+ databaseCommunicationEngine.add(statement);
if (execute(sql, statement, isReturnGeneratedKeys)) {
ResultSet resultSet = statement.getResultSet();
- backendConnection.add(resultSet);
+ databaseCommunicationEngine.add(resultSet);
return createQueryResult(resultSet, connectionMode);
}
return new UpdateResult(statement.getUpdateCount(),
isReturnGeneratedKeys ? getGeneratedKey(statement) : 0L);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
index 3967393..f7edf40 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyPreparedStatementExecutorCallback;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyStatementExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -34,23 +34,24 @@ public final class ProxyJDBCExecutorCallbackFactory {
/**
* Create new instance of Proxy JDBC executor callback.
- *
+ *
* @param type driver type
* @param databaseType database type
* @param sqlStatement SQL statement
- * @param backendConnection backend connection
+ * @param databaseCommunicationEngine database communication engine
* @param isReturnGeneratedKeys is return generated keys or not
* @param isExceptionThrown is exception thrown or not
* @param isFetchMetaData is fetch meta data or not
* @return instance of Proxy JDBC executor callback
*/
- public static ProxyJDBCExecutorCallback newInstance(final String type,
final DatabaseType databaseType, final SQLStatement sqlStatement, final
BackendConnection backendConnection,
- final boolean
isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean
isFetchMetaData) {
+ public static ProxyJDBCExecutorCallback newInstance(final String type,
final DatabaseType databaseType, final SQLStatement sqlStatement,
+ final
DatabaseCommunicationEngine databaseCommunicationEngine, final boolean
isReturnGeneratedKeys, final boolean isExceptionThrown,
+ final boolean
isFetchMetaData) {
if (JDBCDriverType.STATEMENT.equals(type)) {
- return new ProxyStatementExecutorCallback(databaseType,
sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown,
isFetchMetaData);
+ return new ProxyStatementExecutorCallback(databaseType,
sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys,
isExceptionThrown, isFetchMetaData);
}
if (JDBCDriverType.PREPARED_STATEMENT.equals(type)) {
- return new ProxyPreparedStatementExecutorCallback(databaseType,
sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown,
isFetchMetaData);
+ return new ProxyPreparedStatementExecutorCallback(databaseType,
sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys,
isExceptionThrown, isFetchMetaData);
}
throw new UnsupportedOperationException(String.format("Unsupported
driver type: `%s`", type));
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
index 67bcf1c..f8b4a60 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -31,9 +31,9 @@ import java.sql.Statement;
*/
public final class ProxyPreparedStatementExecutorCallback extends
ProxyJDBCExecutorCallback {
- public ProxyPreparedStatementExecutorCallback(final DatabaseType
databaseType, final SQLStatement sqlStatement, final BackendConnection
backendConnection,
+ public ProxyPreparedStatementExecutorCallback(final DatabaseType
databaseType, final SQLStatement sqlStatement, final
DatabaseCommunicationEngine databaseCommunicationEngine,
final boolean
isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean
fetchMetaData) {
- super(databaseType, sqlStatement, backendConnection,
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+ super(databaseType, sqlStatement, databaseCommunicationEngine,
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
index 88647fc..17c2044 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -30,9 +30,9 @@ import java.sql.Statement;
*/
public final class ProxyStatementExecutorCallback extends
ProxyJDBCExecutorCallback {
- public ProxyStatementExecutorCallback(final DatabaseType databaseType,
final SQLStatement sqlStatement, final BackendConnection backendConnection,
+ public ProxyStatementExecutorCallback(final DatabaseType databaseType,
final SQLStatement sqlStatement, final DatabaseCommunicationEngine
databaseCommunicationEngine,
final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown, final boolean fetchMetaData) {
- super(databaseType, sqlStatement, backendConnection,
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+ super(databaseType, sqlStatement, databaseCommunicationEngine,
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
index f87cb1d..5c4e12f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
@@ -55,4 +55,12 @@ public interface TextProtocolBackendHandler {
default Collection<Object> getRowData() throws SQLException {
return Collections.emptyList();
}
+
+ /**
+ * Close handler.
+ *
+ * @throws SQLException SQL exception
+ */
+ default void close() throws SQLException {
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index f2633a8..8d8ad45 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -64,4 +64,9 @@ public final class SchemaAssignedDatabaseBackendHandler
implements DatabaseBacke
public Collection<Object> getRowData() throws SQLException {
return databaseCommunicationEngine.getQueryResponseRow().getData();
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
index 57ec70f..3871ad1 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
@@ -74,4 +74,9 @@ public final class UnicastDatabaseBackendHandler implements
DatabaseBackendHandl
public Collection<Object> getRowData() throws SQLException {
return databaseCommunicationEngine.getQueryResponseRow().getData();
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
index 5677758..e2768a4 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.proxy.backend.communication;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import
org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataContexts;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
@@ -44,11 +44,17 @@ import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLSta
import
org.apache.shardingsphere.transaction.context.impl.StandardTransactionContexts;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -56,18 +62,33 @@ import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public final class DatabaseCommunicationEngineTest {
+ @Mock
+ private BackendConnection backendConnection;
+
+ @Mock
+ private Statement statement;
+
+ @Mock
+ private ResultSet resultSet;
+
@Before
public void setUp() {
- MetaDataContexts metaDataContexts = new
StandardMetaDataContexts(mockMetaDataMap(),
mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
+ when(backendConnection.getSchemaName()).thenReturn("schema");
+ MetaDataContexts metaDataContexts = new
StandardMetaDataContexts(mockMetaDataMap(),
mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
new ConfigurationProperties(new Properties()));
ProxyContext.getInstance().init(metaDataContexts, new
StandardTransactionContexts());
}
@@ -81,8 +102,6 @@ public final class DatabaseCommunicationEngineTest {
@Test
public void assertBinaryProtocolQueryHeader() throws SQLException,
NoSuchFieldException {
- BackendConnection backendConnection = mock(BackendConnection.class);
- when(backendConnection.getSchemaName()).thenReturn("schema");
DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
"schemaName", Collections.emptyList(), backendConnection);
assertNotNull(engine);
@@ -95,7 +114,7 @@ public final class DatabaseCommunicationEngineTest {
private MemoryQueryResultRow memoryQueryResultRow;
@Override
- protected List<MemoryQueryResultRow> init(final ShardingSphereRule
rule, final ShardingSphereSchema schema,
+ protected List<MemoryQueryResultRow> init(final ShardingSphereRule
rule, final ShardingSphereSchema schema,
final
SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) {
memoryQueryResultRow = mock(MemoryQueryResultRow.class);
return Collections.singletonList(memoryQueryResultRow);
@@ -117,7 +136,6 @@ public final class DatabaseCommunicationEngineTest {
ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
when(schema.get("t_logic_order")).thenReturn(new
TableMetaData(Collections.singletonList(columnMetaData),
Collections.singletonList(new IndexMetaData("order_id"))));
DataSourcesMetaData dataSourcesMetaData =
mock(DataSourcesMetaData.class);
-
when(dataSourcesMetaData.getDataSourceMetaData("ds_0")).thenReturn(mock(DataSourceMetaData.class));
when(result.getResource().getDataSourcesMetaData()).thenReturn(dataSourcesMetaData);
when(result.getSchema()).thenReturn(schema);
ShardingRule shardingRule = mock(ShardingRule.class);
@@ -132,7 +150,6 @@ public final class DatabaseCommunicationEngineTest {
when(result.getTableName(1)).thenReturn("t_order");
when(result.getColumnLabel(1)).thenReturn("order_id");
when(result.getColumnName(1)).thenReturn("order_id");
- when(result.getColumnName(2)).thenReturn("expr");
when(result.getColumnType(1)).thenReturn(Types.INTEGER);
when(result.isSigned(1)).thenReturn(true);
when(result.isAutoIncrement(1)).thenReturn(true);
@@ -141,4 +158,72 @@ public final class DatabaseCommunicationEngineTest {
when(result.isNotNull(1)).thenReturn(true);
return result;
}
+
+ @Test
+ public void assertAddStatementCorrectly() {
+ DatabaseCommunicationEngine engine =
+
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
"schemaName", Collections.emptyList(), backendConnection);
+ engine.add(statement);
+ Collection<?> actual = getField(engine, "cachedStatements");
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(statement));
+ }
+
+ @Test
+ public void assertAddResultSetCorrectly() {
+ DatabaseCommunicationEngine engine =
+
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
"schemaName", Collections.emptyList(), backendConnection);
+ engine.add(resultSet);
+ Collection<?> actual = getField(engine, "cachedResultSets");
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(resultSet));
+ }
+
+ @Test
+ public void assertCloseCorrectly() throws SQLException {
+ DatabaseCommunicationEngine engine =
+
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
"schemaName", Collections.emptyList(), backendConnection);
+ Collection<ResultSet> cachedResultSets = getField(engine,
"cachedResultSets");
+ cachedResultSets.add(resultSet);
+ Collection<Statement> cachedStatements = getField(engine,
"cachedStatements");
+ cachedStatements.add(statement);
+ engine.close();
+ verify(resultSet).close();
+ verify(statement).close();
+ assertTrue(cachedResultSets.isEmpty());
+ assertTrue(cachedStatements.isEmpty());
+ }
+
+ @Test
+ public void assertCloseResultSetsWithExceptionThrown() throws SQLException
{
+ DatabaseCommunicationEngine engine =
+
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
"schemaName", Collections.emptyList(), backendConnection);
+ Collection<ResultSet> cachedResultSets = getField(engine,
"cachedResultSets");
+ SQLException sqlExceptionByResultSet = new SQLException("ResultSet");
+ doThrow(sqlExceptionByResultSet).when(resultSet).close();
+ cachedResultSets.add(resultSet);
+ Collection<Statement> cachedStatements = getField(engine,
"cachedStatements");
+ SQLException sqlExceptionByStatement = new SQLException("Statement");
+ doThrow(sqlExceptionByStatement).when(statement).close();
+ cachedStatements.add(statement);
+ SQLException actual = null;
+ try {
+ engine.close();
+ } catch (final SQLException ex) {
+ actual = ex;
+ }
+ verify(resultSet).close();
+ verify(statement).close();
+ assertTrue(cachedResultSets.isEmpty());
+ assertTrue(cachedStatements.isEmpty());
+ assertThat(actual.getNextException(), is(sqlExceptionByResultSet));
+ assertThat(actual.getNextException().getNextException(),
is(sqlExceptionByStatement));
+ }
+
+ @SneakyThrows
+ private <T> T getField(final DatabaseCommunicationEngine target, final
String fieldName) {
+ Field field =
DatabaseCommunicationEngine.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(target);
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index b7498eb..b9e3ea8 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -44,7 +44,6 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
@@ -260,78 +259,6 @@ public final class BackendConnectionTest {
}
@Test
- public void assertAddStatementCorrectly() throws NoSuchFieldException,
IllegalAccessException {
- Statement statement = mock(Statement.class);
- backendConnection.add(statement);
- Field field =
backendConnection.getClass().getDeclaredField("cachedStatements");
- field.setAccessible(true);
- assertTrue(((Collection<?>)
field.get(backendConnection)).contains(statement));
- }
-
- @Test
- public void assertAddResultSetCorrectly() throws NoSuchFieldException,
IllegalAccessException {
- ResultSet resultSet = mock(ResultSet.class);
- backendConnection.add(resultSet);
- Field field =
backendConnection.getClass().getDeclaredField("cachedResultSets");
- field.setAccessible(true);
- assertTrue(((Collection<?>)
field.get(backendConnection)).contains(resultSet));
- }
-
- @Test
- public void assertCloseResultSetsCorrectly() throws NoSuchFieldException,
SQLException, IllegalAccessException {
- Field field =
backendConnection.getClass().getDeclaredField("cachedResultSets");
- field.setAccessible(true);
- Collection<ResultSet> cachedResultSets = (Collection<ResultSet>)
field.get(backendConnection);
- ResultSet resultSet = mock(ResultSet.class);
- cachedResultSets.add(resultSet);
- backendConnection.closeResultSets();
- verify(resultSet, times(1)).close();
- assertTrue(cachedResultSets.isEmpty());
- }
-
- @Test
- public void assertCloseResultSetsWithExceptionThrown() throws
NoSuchFieldException, SQLException, IllegalAccessException {
- Field field =
backendConnection.getClass().getDeclaredField("cachedResultSets");
- field.setAccessible(true);
- Collection<ResultSet> cachedResultSets = (Collection<ResultSet>)
field.get(backendConnection);
- ResultSet resultSet = mock(ResultSet.class);
- SQLException sqlException = new SQLException("");
- doThrow(sqlException).when(resultSet).close();
- cachedResultSets.add(resultSet);
- Collection<SQLException> result = backendConnection.closeResultSets();
- verify(resultSet, times(1)).close();
- assertTrue(cachedResultSets.isEmpty());
- assertTrue(result.contains(sqlException));
- }
-
- @Test
- public void assertCloseStatementsCorrectly() throws NoSuchFieldException,
SQLException, IllegalAccessException {
- Field field =
backendConnection.getClass().getDeclaredField("cachedStatements");
- field.setAccessible(true);
- Collection<Statement> cachedStatement = (Collection<Statement>)
field.get(backendConnection);
- Statement statement = mock(Statement.class);
- cachedStatement.add(statement);
- backendConnection.closeStatements();
- verify(statement, times(1)).close();
- assertTrue(cachedStatement.isEmpty());
- }
-
- @Test
- public void assertCloseStatementsWithExceptionThrown() throws
SQLException, NoSuchFieldException, IllegalAccessException {
- Field field =
backendConnection.getClass().getDeclaredField("cachedStatements");
- field.setAccessible(true);
- Collection<Statement> cachedStatement = (Collection<Statement>)
field.get(backendConnection);
- Statement statement = mock(Statement.class);
- cachedStatement.add(statement);
- SQLException sqlException = new SQLException("");
- doThrow(sqlException).when(statement).close();
- Collection<SQLException> result = backendConnection.closeStatements();
- verify(statement, times(1)).close();
- assertTrue(cachedStatement.isEmpty());
- assertTrue(result.contains(sqlException));
- }
-
- @Test
public void assertCloseConnectionsCorrectlyWhenNotForceRollback() throws
NoSuchFieldException, IllegalAccessException, SQLException {
Field field =
backendConnection.getClass().getDeclaredField("cachedConnections");
field.setAccessible(true);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index e92d22f..d55dabd 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -90,13 +90,17 @@ public final class CommandExecutorTask implements Runnable {
CommandPacketType type =
commandExecuteEngine.getCommandPacketType(payload);
CommandPacket commandPacket =
commandExecuteEngine.getCommandPacket(payload, type, backendConnection);
CommandExecutor commandExecutor =
commandExecuteEngine.getCommandExecutor(type, commandPacket, backendConnection);
- Collection<DatabasePacket<?>> responsePackets =
commandExecutor.execute();
- if (responsePackets.isEmpty()) {
- return false;
- }
- responsePackets.forEach(context::write);
- if (commandExecutor instanceof QueryCommandExecutor) {
- return commandExecuteEngine.writeQueryData(context,
backendConnection, (QueryCommandExecutor) commandExecutor,
responsePackets.size());
+ try {
+ Collection<DatabasePacket<?>> responsePackets =
commandExecutor.execute();
+ if (responsePackets.isEmpty()) {
+ return false;
+ }
+ responsePackets.forEach(context::write);
+ if (commandExecutor instanceof QueryCommandExecutor) {
+ return commandExecuteEngine.writeQueryData(context,
backendConnection, (QueryCommandExecutor) commandExecutor,
responsePackets.size());
+ }
+ } finally {
+ commandExecutor.close();
}
return
databaseProtocolFrontendEngine.getFrontendContext().isFlushForPerCommandPacket();
}
@@ -113,8 +117,6 @@ public final class CommandExecutorTask implements Runnable {
private Collection<SQLException> closeExecutionResources() {
Collection<SQLException> result = new LinkedList<>();
PrimaryVisitedManager.clear();
- result.addAll(backendConnection.closeResultSets());
- result.addAll(backendConnection.closeStatements());
result.addAll(backendConnection.closeFederateExecutor());
return result;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d4d2984..cd6c297 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -94,8 +94,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
private void closeAllResources() {
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
PrimaryVisitedManager.clear();
- backendConnection.closeResultSets();
- backendConnection.closeStatements();
+ backendConnection.closeDatabaseCommunicationEngines();
backendConnection.closeConnections(true);
backendConnection.closeFederateExecutor();
databaseProtocolFrontendEngine.release(backendConnection);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 822d715..b9422df 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -102,14 +102,13 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
-
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
verify(connectionStatus).switchToUsing();
+ verify(queryCommandExecutor).close();
}
@Test
@@ -123,8 +122,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
-
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
@@ -134,6 +131,7 @@ public final class CommandExecutorTaskTest {
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
verify(executeEngine).writeQueryData(handlerContext,
backendConnection, queryCommandExecutor, 1);
+ verify(queryCommandExecutor).close();
}
@Test
@@ -148,8 +146,6 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
-
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
@@ -158,6 +154,7 @@ public final class CommandExecutorTaskTest {
verify(connectionStatus).switchToUsing();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
+ verify(commandExecutor).close();
}
@Test
@@ -169,8 +166,6 @@ public final class CommandExecutorTaskTest {
when(executeEngine.getErrorPacket(mockException,
backendConnection)).thenReturn(databasePacket);
when(executeEngine.getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
-
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index a6569cd..afcde18 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,8 +25,8 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
@@ -111,4 +111,9 @@ public final class MySQLComStmtExecuteExecutor implements
QueryCommandExecutor {
return new BinaryRow(queryResponseRow.getCells().stream().map(
each -> new
BinaryCell(MySQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
each).getJdbcType()), each.getData())).collect(Collectors.toList()));
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
index 3e2b1bb..9f7e811 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
@@ -23,12 +23,12 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fie
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
@@ -76,4 +76,9 @@ public final class MySQLComFieldListPacketExecutor implements
CommandExecutor {
result.add(new MySQLEofPacket(++currentSequenceId));
return result;
}
+
+ @Override
+ public void close() throws SQLException {
+ databaseCommunicationEngine.close();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
index 9984604..9d06568 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
@@ -79,4 +79,9 @@ public final class MySQLComQueryPacketExecutor implements
QueryCommandExecutor {
public MySQLPacket getQueryRowPacket() throws SQLException {
return new MySQLTextResultSetRowPacket(++currentSequenceId,
textProtocolBackendHandler.getRowData());
}
+
+ @Override
+ public void close() throws SQLException {
+ textProtocolBackendHandler.close();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
index 6da37e2..f5bb0c1 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQ
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -65,7 +66,7 @@ public final class PostgreSQLCommandExecutorFactory {
log.debug("Execute packet type: {}, value: {}", commandPacketType,
commandPacket);
switch (commandPacketType) {
case SIMPLE_QUERY:
- return new
PostgreSQLComQueryExecutor((PostgreSQLComQueryPacket) commandPacket,
backendConnection);
+ return new PostgreSQLComQueryExecutor(connectionContext,
(PostgreSQLComQueryPacket) commandPacket, backendConnection);
case PARSE_COMMAND:
return new PostgreSQLComParseExecutor(connectionContext,
(PostgreSQLComParsePacket) commandPacket, backendConnection);
case BIND_COMMAND:
@@ -77,11 +78,11 @@ public final class PostgreSQLCommandExecutorFactory {
connectionContext.getPendingExecutors().add(new
PostgreSQLComDescribeExecutor(connectionContext));
break;
case EXECUTE_COMMAND:
- return new PostgreSQLComExecuteExecutor(connectionContext);
+ return new PostgreSQLComExecuteExecutor(connectionContext,
(PostgreSQLComExecutePacket) commandPacket);
case SYNC_COMMAND:
return new PostgreSQLComSyncExecutor(connectionContext,
backendConnection);
case CLOSE_COMMAND:
- connectionContext.getPendingExecutors().add(new
PostgreSQLComCloseExecutor((PostgreSQLComClosePacket) commandPacket,
backendConnection));
+ connectionContext.getPendingExecutors().add(new
PostgreSQLComCloseExecutor(connectionContext, (PostgreSQLComClosePacket)
commandPacket, backendConnection));
break;
case TERMINATE:
return new PostgreSQLComTerminationExecutor();
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
index 43a01ed..c05f4ad 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
@@ -19,28 +19,113 @@ package
org.apache.shardingsphere.proxy.frontend.postgresql.command;
import lombok.Getter;
import lombok.Setter;
+import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import java.sql.SQLException;
import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
* PostgreSQL connection context.
*/
-@Getter
@Setter
public final class PostgreSQLConnectionContext {
+ private final Map<String, PostgreSQLPortal> portals = new
LinkedHashMap<>();
+
+ @Getter
private final Collection<CommandExecutor> pendingExecutors = new
LinkedList<>();
private SQLStatement sqlStatement;
+ @Getter
private long updateCount;
/**
+ * Create a portal.
+ *
+ * @param portal portal name
+ * @param sql sql
+ * @param parameters bind parameters
+ * @param resultFormats result formats
+ * @param backendConnection backend connection
+ * @return a new portal
+ */
+ public PostgreSQLPortal createPortal(final String portal, final String
sql, final List<Object> parameters, final List<PostgreSQLValueFormat>
resultFormats, final BackendConnection backendConnection) throws SQLException {
+ if (!getSqlStatement().isPresent()) {
+ SQLStatement result = parseSql(sql,
backendConnection.getSchemaName());
+ setSqlStatement(result);
+ }
+ PostgreSQLPortal result = new PostgreSQLPortal(sqlStatement, sql,
parameters, resultFormats, backendConnection);
+ portals.put(portal, result);
+ return result;
+ }
+
+ private SQLStatement parseSql(final String sql, final String schemaName) {
+ if (sql.isEmpty()) {
+ return new EmptyStatement();
+ }
+ ShardingSphereSQLParserEngine sqlStatementParserEngine = new
ShardingSphereSQLParserEngine(
+
DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
+ return sqlStatementParserEngine.parse(sql, true);
+ }
+
+ /**
+ * Get portal.
+ *
+ * @param portal portal name
+ * @return portal
+ */
+ public PostgreSQLPortal getPortal(final String portal) {
+ return portals.get(portal);
+ }
+
+ /**
+ * Close portal.
+ *
+ * @param portal portal name
+ */
+ public void closePortal(final String portal) throws SQLException {
+ PostgreSQLPortal result = portals.remove(portal);
+ if (null != result) {
+ result.close();
+ }
+ }
+
+ /**
+ * Close all portals.
+ */
+ public void closeAllPortals() {
+ Collection<SQLException> result = new LinkedList<>();
+ for (PostgreSQLPortal each : portals.values()) {
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ result.add(ex);
+ }
+ }
+ portals.clear();
+ if (result.isEmpty()) {
+ return;
+ }
+ SQLException ex = new SQLException("Close all portals failed.");
+ result.forEach(ex::setNextException);
+ }
+
+ /**
* Get describe command executor.
*
* @return describe command executor
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/PostgreSQLPortal.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/PostgreSQLPortal.java
new file mode 100644
index 0000000..ce57367
--- /dev/null
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/PostgreSQLPortal.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary;
+
+import lombok.Getter;
+import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
+import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
+import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
+import
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
+import
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * PostgreSQL portal.
+ */
+@Getter
+public final class PostgreSQLPortal {
+
+ private final List<PostgreSQLValueFormat> resultFormats;
+
+ private final DatabaseCommunicationEngine databaseCommunicationEngine;
+
+ private final TextProtocolBackendHandler textProtocolBackendHandler;
+
+ public PostgreSQLPortal(final SQLStatement sqlStatement, final String sql,
final List<Object> parameters, final List<PostgreSQLValueFormat> resultFormats,
final BackendConnection backendConnection) throws SQLException {
+ this.resultFormats = resultFormats;
+ if (sqlStatement instanceof TCLStatement || sqlStatement instanceof
EmptyStatement) {
+ databaseCommunicationEngine = null;
+ textProtocolBackendHandler =
TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"),
sql, backendConnection);
+ return;
+ }
+ databaseCommunicationEngine =
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement,
sql, parameters, backendConnection);
+ textProtocolBackendHandler = null;
+ }
+
+ /**
+ * Execute portal.
+ *
+ * @return response header
+ * @throws SQLException SQL exception
+ */
+ public ResponseHeader execute() throws SQLException {
+ return null != databaseCommunicationEngine ?
databaseCommunicationEngine.execute() : textProtocolBackendHandler.execute();
+ }
+
+ /**
+ * Next.
+ *
+ * @return true if the portal has next packet; else false
+ * @throws SQLException SQL exception
+ */
+ public boolean next() throws SQLException {
+ return null != databaseCommunicationEngine &&
databaseCommunicationEngine.next();
+ }
+
+ /**
+ * Fetch next packet from the portal.
+ *
+ * @return next packet
+ * @throws SQLException SQL exception
+ */
+ public PostgreSQLPacket nextPacket() throws SQLException {
+ QueryResponseRow queryResponseRow =
databaseCommunicationEngine.getQueryResponseRow();
+ return new PostgreSQLDataRowPacket(getData(queryResponseRow));
+ }
+
+ private List<Object> getData(final QueryResponseRow queryResponseRow) {
+ Collection<QueryResponseCell> cells = queryResponseRow.getCells();
+ List<Object> result = new ArrayList<>(cells.size());
+ List<QueryResponseCell> columns = new ArrayList<>(cells);
+ for (int i = 0; i < columns.size(); i++) {
+ PostgreSQLValueFormat format = determineValueFormat(i);
+ result.add(PostgreSQLValueFormat.BINARY == format ?
createBinaryCell(columns.get(i)) : columns.get(i).getData());
+ }
+ return result;
+ }
+
+ private PostgreSQLValueFormat determineValueFormat(final int columnIndex) {
+ return resultFormats.isEmpty() ? PostgreSQLValueFormat.TEXT :
resultFormats.get(columnIndex % resultFormats.size());
+ }
+
+ private BinaryCell createBinaryCell(final QueryResponseCell cell) {
+ return new
BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
cell).getJdbcType()), cell.getData());
+ }
+
+ /**
+ * Close portal.
+ */
+ public void close() throws SQLException {
+ if (null != databaseCommunicationEngine) {
+ databaseCommunicationEngine.close();
+ }
+ if (null != textProtocolBackendHandler) {
+ textProtocolBackendHandler.close();
+ }
+ }
+}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
index b85a712..ee879bd 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
@@ -22,7 +22,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat;
+import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
@@ -159,12 +159,17 @@ public final class OpenGaussComBatchBindExecutor
implements QueryCommandExecutor
List<Object> result = new ArrayList<>(cells.size());
List<QueryResponseCell> columns = new ArrayList<>(cells);
for (int i = 0; i < columns.size(); i++) {
- PostgreSQLColumnFormat format =
packet.getResultFormatByColumnIndex(i);
- result.add(PostgreSQLColumnFormat.BINARY == format ?
createBinaryCell(columns.get(i)) : columns.get(i).getData());
+ PostgreSQLValueFormat format = determineValueFormat(i);
+ result.add(PostgreSQLValueFormat.BINARY == format ?
createBinaryCell(columns.get(i)) : columns.get(i).getData());
}
return result;
}
+ private PostgreSQLValueFormat determineValueFormat(final int columnIndex) {
+ List<PostgreSQLValueFormat> resultFormats = packet.getResultFormats();
+ return resultFormats.isEmpty() ? PostgreSQLValueFormat.TEXT :
resultFormats.get(columnIndex % resultFormats.size());
+ }
+
private BinaryCell createBinaryCell(final QueryResponseCell cell) {
return new
BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
cell).getJdbcType()), cell.getData());
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
index 78a1a31..200e621 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
@@ -17,42 +17,22 @@
package
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
-import
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
-import
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -61,7 +41,7 @@ import java.util.List;
* Command bind executor for PostgreSQL.
*/
@RequiredArgsConstructor
-public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
+public final class PostgreSQLComBindExecutor implements CommandExecutor {
private final PostgreSQLConnectionContext connectionContext;
@@ -69,60 +49,22 @@ public final class PostgreSQLComBindExecutor implements
QueryCommandExecutor {
private final BackendConnection backendConnection;
- private DatabaseCommunicationEngine databaseCommunicationEngine;
-
- private TextProtocolBackendHandler textProtocolBackendHandler;
-
- @Getter
- private volatile ResponseType responseType;
-
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
- init();
+ PostgreSQLPortal portal =
connectionContext.createPortal(packet.getPortal(), packet.getSql(),
packet.getParameters(), packet.getResultFormats(), backendConnection);
List<DatabasePacket<?>> result = new LinkedList<>();
result.add(new PostgreSQLBindCompletePacket());
- if (null == databaseCommunicationEngine && null ==
textProtocolBackendHandler) {
- return result;
- }
- ResponseHeader responseHeader = null != databaseCommunicationEngine ?
databaseCommunicationEngine.execute() : textProtocolBackendHandler.execute();
- if (responseHeader instanceof QueryResponseHeader &&
connectionContext.getDescribeExecutor().isPresent()) {
-
connectionContext.getDescribeExecutor().get().setRowDescriptionPacket(getRowDescriptionPacket((QueryResponseHeader)
responseHeader));
+ ResponseHeader responseHeader = portal.execute();
+ if (responseHeader instanceof QueryResponseHeader) {
+ connectionContext.getDescribeExecutor().ifPresent(describeExecutor
->
describeExecutor.setRowDescriptionPacket(createRowDescriptionPacket((QueryResponseHeader)
responseHeader)));
}
if (responseHeader instanceof UpdateResponseHeader) {
- responseType = ResponseType.UPDATE;
connectionContext.setUpdateCount(((UpdateResponseHeader)
responseHeader).getUpdateCount());
}
return result;
}
- private void init() throws SQLException {
- SQLStatement sqlStatement = getSqlStatement();
- if (sqlStatement instanceof TCLStatement || sqlStatement instanceof
EmptyStatement) {
- textProtocolBackendHandler =
TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"),
packet.getSql(), backendConnection);
- return;
- }
- databaseCommunicationEngine =
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement,
packet.getSql(), packet.getParameters(), backendConnection);
- }
-
- private SQLStatement getSqlStatement() {
- return connectionContext.getSqlStatement().orElseGet(() -> {
- SQLStatement result = parseSql(packet.getSql(),
backendConnection.getSchemaName());
- connectionContext.setSqlStatement(result);
- return result;
- });
- }
-
- private SQLStatement parseSql(final String sql, final String schemaName) {
- if (sql.isEmpty()) {
- return new EmptyStatement();
- }
- ShardingSphereSQLParserEngine sqlStatementParserEngine = new
ShardingSphereSQLParserEngine(
-
DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
- return sqlStatementParserEngine.parse(sql, true);
- }
-
- private PostgreSQLRowDescriptionPacket getRowDescriptionPacket(final
QueryResponseHeader queryResponseHeader) {
- responseType = ResponseType.QUERY;
+ private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final
QueryResponseHeader queryResponseHeader) {
Collection<PostgreSQLColumnDescription> columnDescriptions =
createColumnDescriptions(queryResponseHeader);
return new PostgreSQLRowDescriptionPacket(columnDescriptions.size(),
columnDescriptions);
}
@@ -135,35 +77,4 @@ public final class PostgreSQLComBindExecutor implements
QueryCommandExecutor {
}
return result;
}
-
- @Override
- public boolean next() throws SQLException {
- return null != databaseCommunicationEngine &&
databaseCommunicationEngine.next();
- }
-
- @Override
- public PostgreSQLPacket getQueryRowPacket() throws SQLException {
- QueryResponseRow queryResponseRow =
databaseCommunicationEngine.getQueryResponseRow();
- return new PostgreSQLDataRowPacket(getData(queryResponseRow));
- }
-
- private List<Object> getData(final QueryResponseRow queryResponseRow) {
- Collection<QueryResponseCell> cells = queryResponseRow.getCells();
- List<Object> result = new ArrayList<>(cells.size());
- List<QueryResponseCell> columns = new ArrayList<>(cells);
- for (int i = 0; i < columns.size(); i++) {
- PostgreSQLValueFormat format = determineValueFormat(i);
- result.add(PostgreSQLValueFormat.BINARY == format ?
createBinaryCell(columns.get(i)) : columns.get(i).getData());
- }
- return result;
- }
-
- private PostgreSQLValueFormat determineValueFormat(final int columnIndex) {
- List<PostgreSQLValueFormat> resultFormats = packet.getResultFormats();
- return resultFormats.isEmpty() ? PostgreSQLValueFormat.TEXT :
resultFormats.get(columnIndex % resultFormats.size());
- }
-
- private BinaryCell createBinaryCell(final QueryResponseCell cell) {
- return new
BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
cell).getJdbcType()), cell.getData());
- }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
index 6504938..e0b8532 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
@@ -19,15 +19,13 @@ package
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLErrorCode;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLMessageSeverityLevel;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.ConnectionScopeBinaryStatementRegistry;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLCloseCompletePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import java.sql.SQLException;
import java.util.Collection;
@@ -39,6 +37,8 @@ import java.util.Collections;
@RequiredArgsConstructor
public final class PostgreSQLComCloseExecutor implements CommandExecutor {
+ private final PostgreSQLConnectionContext connectionContext;
+
private final PostgreSQLComClosePacket packet;
private final BackendConnection backendConnection;
@@ -47,25 +47,25 @@ public final class PostgreSQLComCloseExecutor implements
CommandExecutor {
public Collection<DatabasePacket<?>> execute() throws SQLException {
switch (packet.getType()) {
case PREPARED_STATEMENT:
- return closePreparedStatement();
+ closePreparedStatement();
+ break;
case PORTAL:
- return closePortal();
+ closePortal();
+ break;
default:
throw new
UnsupportedOperationException(packet.getType().name());
}
+ return Collections.singletonList(new PostgreSQLCloseCompletePacket());
}
- private Collection<DatabasePacket<?>> closePreparedStatement() {
+ private void closePreparedStatement() {
ConnectionScopeBinaryStatementRegistry binaryStatementRegistry =
PostgreSQLBinaryStatementRegistry.getInstance().get(backendConnection.getConnectionId());
if (null != binaryStatementRegistry) {
binaryStatementRegistry.closeStatement(packet.getName());
}
- return Collections.singletonList(new PostgreSQLCloseCompletePacket());
}
- private Collection<DatabasePacket<?>> closePortal() {
- PostgreSQLErrorResponsePacket packet =
PostgreSQLErrorResponsePacket.newBuilder(
- PostgreSQLMessageSeverityLevel.ERROR,
PostgreSQLErrorCode.FEATURE_NOT_SUPPORTED, "Not implemented: Close
portal").build();
- return Collections.singleton(packet);
+ private void closePortal() throws SQLException {
+ connectionContext.closePortal(packet.getName());
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
index 1e4ba7e..af6d6e9 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
@@ -20,19 +20,21 @@ package
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLPortalSuspendedPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.Optional;
@@ -40,51 +42,48 @@ import java.util.Optional;
* Command execute executor for PostgreSQL.
*/
@RequiredArgsConstructor
-public final class PostgreSQLComExecuteExecutor implements
QueryCommandExecutor {
+public final class PostgreSQLComExecuteExecutor implements CommandExecutor {
private final PostgreSQLConnectionContext connectionContext;
- private final Collection<QueryCommandExecutor> queryCommandExecutors = new
LinkedList<>();
+ private final PostgreSQLComExecutePacket packet;
private long dataRows;
- private boolean commandComplete;
-
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
Collection<DatabasePacket<?>> result = new LinkedList<>();
for (CommandExecutor each : connectionContext.getPendingExecutors()) {
- if (each instanceof QueryCommandExecutor) {
- queryCommandExecutors.add((QueryCommandExecutor) each);
- }
result.addAll(each.execute());
}
connectionContext.getPendingExecutors().clear();
+ result.addAll(doExecute());
+ result.add(createExecutionCompletedPacket());
return result;
}
- @Override
- public ResponseType getResponseType() {
- return ResponseType.QUERY;
+ private Collection<? extends DatabasePacket<?>> doExecute() throws
SQLException {
+ Collection<DatabasePacket<?>> result = new LinkedList<>();
+ while (!reachedMaxRows()) {
+ Optional<DatabasePacket<?>> packet = getPacketFromPortal();
+ if (!packet.isPresent()) {
+ break;
+ }
+ dataRows++;
+ result.add(packet.get());
+ }
+ return result;
}
- @Override
- public boolean next() throws SQLException {
- return !commandComplete;
+ private Optional<DatabasePacket<?>> getPacketFromPortal() throws
SQLException {
+ PostgreSQLPortal portal =
connectionContext.getPortal(packet.getPortal());
+ return portal.next() ? Optional.of(portal.nextPacket()) :
Optional.empty();
}
- @Override
- public DatabasePacket<?> getQueryRowPacket() throws SQLException {
- Optional<DatabasePacket<?>> result =
getPacketFromQueryCommandExecutors();
- if (result.isPresent()) {
- dataRows++;
- return result.get();
+ private PostgreSQLIdentifierPacket createExecutionCompletedPacket() {
+ if (reachedMaxRows()) {
+ return new PostgreSQLPortalSuspendedPacket();
}
- return createCommandCompletePacket();
- }
-
- private PostgreSQLIdentifierPacket createCommandCompletePacket() {
- commandComplete = true;
if
(connectionContext.getSqlStatement().map(EmptyStatement.class::isInstance).orElse(false))
{
return new PostgreSQLEmptyQueryResponsePacket();
}
@@ -94,16 +93,18 @@ public final class PostgreSQLComExecuteExecutor implements
QueryCommandExecutor
return result;
}
- private Optional<DatabasePacket<?>> getPacketFromQueryCommandExecutors()
throws SQLException {
- Iterator<QueryCommandExecutor> iterator =
queryCommandExecutors.iterator();
- while (iterator.hasNext()) {
- QueryCommandExecutor next = iterator.next();
- if (next.next()) {
- return Optional.of(next.getQueryRowPacket());
- } else {
- iterator.remove();
- }
+ private boolean reachedMaxRows() {
+ return packet.getMaxRows() > 0 && dataRows == packet.getMaxRows();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (!reachedMaxRows()) {
+ connectionContext.getPortal(packet.getPortal()).close();
+ }
+ if (connectionContext.getSqlStatement().isPresent() &&
+ (connectionContext.getSqlStatement().get() instanceof
CommitStatement || connectionContext.getSqlStatement().get() instanceof
RollbackStatement)) {
+ connectionContext.closeAllPortals();
}
- return Optional.empty();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
index 5e11134..6bd5074 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
@@ -36,8 +36,12 @@ import
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
import java.sql.SQLException;
import java.util.Collection;
@@ -49,12 +53,15 @@ import java.util.LinkedList;
*/
public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
+ private final PostgreSQLConnectionContext connectionContext;
+
private final TextProtocolBackendHandler textProtocolBackendHandler;
@Getter
private volatile ResponseType responseType;
- public PostgreSQLComQueryExecutor(final PostgreSQLComQueryPacket
comQueryPacket, final BackendConnection backendConnection) throws SQLException {
+ public PostgreSQLComQueryExecutor(final PostgreSQLConnectionContext
connectionContext, final PostgreSQLComQueryPacket comQueryPacket, final
BackendConnection backendConnection) throws SQLException {
+ this.connectionContext = connectionContext;
textProtocolBackendHandler =
TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"),
comQueryPacket.getSql(), backendConnection);
}
@@ -84,8 +91,12 @@ public final class PostgreSQLComQueryExecutor implements
QueryCommandExecutor {
}
private PostgreSQLPacket createUpdatePacket(final UpdateResponseHeader
updateResponseHeader) {
- return updateResponseHeader.getSqlStatement() instanceof
EmptyStatement ? new PostgreSQLEmptyQueryResponsePacket()
- : new
PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(updateResponseHeader.getSqlStatement().getClass()).map(Enum::name).orElse(""),
updateResponseHeader.getUpdateCount());
+ SQLStatement sqlStatement = updateResponseHeader.getSqlStatement();
+ if (sqlStatement instanceof CommitStatement || sqlStatement instanceof
RollbackStatement) {
+ connectionContext.closeAllPortals();
+ }
+ return sqlStatement instanceof EmptyStatement ? new
PostgreSQLEmptyQueryResponsePacket()
+ : new
PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(sqlStatement.getClass()).map(Enum::name).orElse(""),
updateResponseHeader.getUpdateCount());
}
@Override
@@ -97,4 +108,9 @@ public final class PostgreSQLComQueryExecutor implements
QueryCommandExecutor {
public PostgreSQLPacket getQueryRowPacket() throws SQLException {
return new
PostgreSQLDataRowPacket(textProtocolBackendHandler.getRowData());
}
+
+ @Override
+ public void close() throws SQLException {
+ textProtocolBackendHandler.close();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
index ec753b1..25c67ea 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.Res
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
-import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.PostgreSQLComBindExecutor;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.sync.PostgreSQLComSyncExecutor;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.PostgreSQLComQueryExecutor;
import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -128,13 +127,4 @@ public final class PostgreSQLCommandExecuteEngineTest {
verify(channelHandlerContext).flush();
verify(channelHandlerContext).write(isA(PostgreSQLReadyForQueryPacket.class));
}
-
- @Test
- public void assertWriteQueryDataWithComBindWithUpdateResponse() throws
SQLException {
- PostgreSQLComBindExecutor bindExecutor =
mock(PostgreSQLComBindExecutor.class);
- when(bindExecutor.getResponseType()).thenReturn(ResponseType.UPDATE);
- PostgreSQLCommandExecuteEngine commandExecuteEngine = new
PostgreSQLCommandExecuteEngine();
- boolean actual =
commandExecuteEngine.writeQueryData(channelHandlerContext, backendConnection,
bindExecutor, 0);
- assertFalse(actual);
- }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
index 9ace085..00cd592 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
@@ -17,22 +17,14 @@
package
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
-import
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
@@ -42,9 +34,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.sql.JDBCType;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
@@ -53,9 +42,7 @@ import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -89,8 +76,6 @@ public final class PostgreSQLComBindExecutorTest {
Collection<DatabasePacket<?>> actual = executor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
- assertFalse(executor.next());
}
@Test
@@ -99,11 +84,9 @@ public final class PostgreSQLComBindExecutorTest {
QueryResponseHeader queryResponseHeader =
mock(QueryResponseHeader.class);
when(databaseCommunicationEngine.execute()).thenReturn(queryResponseHeader);
PostgreSQLComBindExecutor executor = new
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
- setMockFieldIntoExecutor(executor);
Collection<DatabasePacket<?>> actual = executor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(executor.getResponseType(), is(ResponseType.QUERY));
verify(queryResponseHeader).getQueryHeaders();
}
@@ -114,66 +97,18 @@ public final class PostgreSQLComBindExecutorTest {
when(queryResponseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(new
QueryHeader("schema", "table", "label", "column", 1, "type", 2, 3, true, true,
true, true)));
when(databaseCommunicationEngine.execute()).thenReturn(queryResponseHeader);
PostgreSQLComBindExecutor executor = new
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
- setMockFieldIntoExecutor(executor);
Collection<DatabasePacket<?>> actual = executor.execute();
assertThat(actual.size(), is(1));
Iterator<DatabasePacket<?>> actualPackets = actual.iterator();
assertThat(actualPackets.next(),
is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(executor.getResponseType(), is(ResponseType.QUERY));
- }
-
- @Test
- public void assertNext() throws SQLException {
- when(databaseCommunicationEngine.next()).thenReturn(true, false);
- PostgreSQLComBindExecutor executor = new
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
- setMockFieldIntoExecutor(executor);
- assertTrue(executor.next());
- assertFalse(executor.next());
- }
-
- @Test
- public void assertDataRowNotBinary() throws SQLException {
- QueryResponseRow queryResponseRow = mock(QueryResponseRow.class);
-
when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(queryResponseRow);
- PostgreSQLComBindExecutor executor = new
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
- setMockFieldIntoExecutor(executor);
- PostgreSQLPacket actualQueryRowPacket = executor.getQueryRowPacket();
- verify(queryResponseRow).getCells();
- assertThat(actualQueryRowPacket,
is(instanceOf(PostgreSQLDataRowPacket.class)));
- }
-
- @Test
- public void assertDataRowIsBinary() throws SQLException {
-
when(bindPacket.getResultFormats()).thenReturn(Collections.singletonList(PostgreSQLValueFormat.BINARY));
- QueryResponseRow queryResponseRow = mock(QueryResponseRow.class);
-
when(queryResponseRow.getCells()).thenReturn(Collections.singletonList(new
BinaryQueryResponseCell(JDBCType.BIGINT.getVendorTypeNumber(),
Long.MAX_VALUE)));
-
when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(queryResponseRow);
- PostgreSQLComBindExecutor executor = new
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
- setMockFieldIntoExecutor(executor);
- PostgreSQLPacket actualQueryRowPacket = executor.getQueryRowPacket();
- assertThat(actualQueryRowPacket,
is(instanceOf(PostgreSQLDataRowPacket.class)));
- Collection<Object> actualData = ((PostgreSQLDataRowPacket)
actualQueryRowPacket).getData();
- assertThat(actualData.iterator().next(), instanceOf(BinaryCell.class));
}
@Test
public void assertExecuteBindPacketWithUpdateSQL() throws SQLException {
when(databaseCommunicationEngine.execute()).thenReturn(new
UpdateResponseHeader(mock(InsertStatement.class)));
PostgreSQLComBindExecutor executor = new
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
- setMockFieldIntoExecutor(executor);
Collection<DatabasePacket<?>> actual = executor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
- }
-
- @SneakyThrows
- private void setMockFieldIntoExecutor(final PostgreSQLComBindExecutor
executor) {
- Field field =
PostgreSQLComBindExecutor.class.getDeclaredField("databaseCommunicationEngine");
- field.setAccessible(true);
- Field modifiers = Field.class.getDeclaredField("modifiers");
- modifiers.setAccessible(true);
- modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL);
- field.set(executor, databaseCommunicationEngine);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
index 8e747db..4e8453e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
@@ -21,8 +21,8 @@ import
org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLCloseCompletePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,6 +36,7 @@ import java.util.Random;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -44,6 +45,9 @@ public final class PostgreSQLComCloseExecutorTest {
private static final int CONNECTION_ID = new Random().nextInt() &
Integer.MAX_VALUE;
@Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
private PostgreSQLComClosePacket packet;
@Mock
@@ -59,7 +63,7 @@ public final class PostgreSQLComCloseExecutorTest {
public void assertExecuteClosePreparedStatement() throws SQLException {
when(packet.getType()).thenReturn(PostgreSQLComClosePacket.Type.PREPARED_STATEMENT);
when(packet.getName()).thenReturn("S_1");
- PostgreSQLComCloseExecutor closeExecutor = new
PostgreSQLComCloseExecutor(packet, backendConnection);
+ PostgreSQLComCloseExecutor closeExecutor = new
PostgreSQLComCloseExecutor(connectionContext, packet, backendConnection);
Collection<DatabasePacket<?>> actual = closeExecutor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLCloseCompletePacket.class)));
@@ -68,9 +72,12 @@ public final class PostgreSQLComCloseExecutorTest {
@Test
public void assertExecuteClosePortal() throws SQLException {
when(packet.getType()).thenReturn(PostgreSQLComClosePacket.Type.PORTAL);
- PostgreSQLComCloseExecutor closeExecutor = new
PostgreSQLComCloseExecutor(packet, backendConnection);
+ String portalName = "C_1";
+ when(packet.getName()).thenReturn(portalName);
+ PostgreSQLComCloseExecutor closeExecutor = new
PostgreSQLComCloseExecutor(connectionContext, packet, backendConnection);
Collection<DatabasePacket<?>> actual = closeExecutor.execute();
assertThat(actual.size(), is(1));
- assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLErrorResponsePacket.class)));
+ assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLCloseCompletePacket.class)));
+ verify(connectionContext).closePortal(portalName);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
index dd46dfb..b9d5841 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
@@ -20,12 +20,14 @@ package
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -35,13 +37,13 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -52,6 +54,12 @@ public final class PostgreSQLComExecuteExecutorTest {
private PostgreSQLConnectionContext connectionContext;
@Mock
+ private PostgreSQLComExecutePacket packet;
+
+ @Mock
+ private PostgreSQLPortal portal;
+
+ @Mock
private QueryCommandExecutor queryCommandExecutor;
@Mock
@@ -60,35 +68,33 @@ public final class PostgreSQLComExecuteExecutorTest {
@Mock
private PostgreSQLDataRowPacket dataRowPacket;
+ @Before
+ public void setup() {
+ when(packet.getPortal()).thenReturn("");
+ when(connectionContext.getPortal(anyString())).thenReturn(portal);
+ }
+
@Test
public void assertExecuteQuery() throws SQLException {
when(connectionContext.getPendingExecutors()).thenReturn(new
ArrayList<>(Collections.singletonList(queryCommandExecutor)));
when(queryCommandExecutor.execute()).thenReturn(Collections.singletonList(postgreSQLPacket));
- when(queryCommandExecutor.next()).thenReturn(true, false);
- when((PostgreSQLDataRowPacket)
queryCommandExecutor.getQueryRowPacket()).thenReturn(dataRowPacket);
- PostgreSQLComExecuteExecutor actual = new
PostgreSQLComExecuteExecutor(connectionContext);
+ when(portal.next()).thenReturn(true, false);
+ when(portal.nextPacket()).thenReturn(dataRowPacket);
+ PostgreSQLComExecuteExecutor actual = new
PostgreSQLComExecuteExecutor(connectionContext, packet);
Collection<DatabasePacket<?>> actualPackets = actual.execute();
- assertThat(actualPackets.size(), is(1));
- assertThat(actualPackets.iterator().next(), is(postgreSQLPacket));
- assertTrue(actual.next());
- assertThat(actual.getQueryRowPacket(), is(dataRowPacket));
- assertTrue(actual.next());
- assertThat(actual.getQueryRowPacket(),
is(instanceOf(PostgreSQLCommandCompletePacket.class)));
- assertFalse(actual.next());
+ assertThat(actualPackets.size(), is(3));
+ Iterator<DatabasePacket<?>> actualPacketsIterator =
actualPackets.iterator();
+ assertThat(actualPacketsIterator.next(), is(postgreSQLPacket));
+ assertThat(actualPacketsIterator.next(), is(dataRowPacket));
+ assertThat(actualPacketsIterator.next(),
instanceOf(PostgreSQLCommandCompletePacket.class));
}
@Test
public void assertExecuteUpdate() throws SQLException {
when(connectionContext.getSqlStatement()).thenReturn(Optional.of(mock(EmptyStatement.class)));
- PostgreSQLComExecuteExecutor actual = new
PostgreSQLComExecuteExecutor(connectionContext);
- assertTrue(actual.next());
- assertThat(actual.getQueryRowPacket(),
is(instanceOf(PostgreSQLEmptyQueryResponsePacket.class)));
- assertFalse(actual.next());
- }
-
- @Test
- public void assertResponseType() {
- ResponseType actual = new
PostgreSQLComExecuteExecutor(connectionContext).getResponseType();
- assertThat(actual, is(ResponseType.QUERY));
+ when(portal.next()).thenReturn(false);
+ Collection<DatabasePacket<?>> actual = new
PostgreSQLComExecuteExecutor(connectionContext, packet).execute();
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(),
is(instanceOf(PostgreSQLEmptyQueryResponsePacket.class)));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
index b127d89..d057907 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryH
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
import org.junit.Before;
import org.junit.Test;
@@ -56,6 +57,9 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLComQueryExecutorTest {
@Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
private TextProtocolBackendHandler textProtocolBackendHandler;
private PostgreSQLComQueryExecutor queryExecutor;
@@ -65,7 +69,7 @@ public final class PostgreSQLComQueryExecutorTest {
PostgreSQLComQueryPacket queryPacket =
mock(PostgreSQLComQueryPacket.class);
BackendConnection backendConnection = mock(BackendConnection.class);
when(queryPacket.getSql()).thenReturn("");
- queryExecutor = new PostgreSQLComQueryExecutor(queryPacket,
backendConnection);
+ queryExecutor = new PostgreSQLComQueryExecutor(connectionContext,
queryPacket, backendConnection);
setMockFieldIntoExecutor(queryExecutor);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
index c1aa8a4..cc94e31 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
@@ -34,4 +34,12 @@ public interface CommandExecutor {
* @throws SQLException SQL exception
*/
Collection<DatabasePacket<?>> execute() throws SQLException;
+
+ /**
+ * Close command executor.
+ *
+ * @throws SQLException SQL exception
+ */
+ default void close() throws SQLException {
+ }
}