This is an automated email from the ASF dual-hosted git repository.
terrymanu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dd873cc948a Add prepared statement caching mechanism in proxy (#38644)
dd873cc948a is described below
commit dd873cc948a72aec2522418e5eeca25479964c6e
Author: KazenkE <[email protected]>
AuthorDate: Sat Jun 6 00:36:04 2026 +0800
Add prepared statement caching mechanism in proxy (#38644)
* Add prepared statement caching mechanism
* change prepared statement cache to Firebird-only connection-held scope
* verify Firebird connection-held prepared statement cache reuse.
* fix the problem with check spotless.
* Fix same-handle Firebird prepare cleanup and prove reuse on real held
backend path.
* fix the problem with confliction.
* Refine Firebird prepared statement cache cleanup.
* fix Firebird cleanup order and add held-handler regressions.
---
RELEASE-NOTES.md | 1 +
.../connector/ProxyDatabaseConnectionManager.java | 11 +
.../connector/StandardDatabaseProxyConnector.java | 8 +
.../jdbc/statement/JDBCBackendStatement.java | 28 +-
.../proxy/backend/session/ConnectionSession.java | 40 ++-
.../session/PreparedStatementCacheContext.java | 220 ++++++++++++++
.../backend/session/PreparedStatementCacheKey.java | 37 +++
.../ProxyDatabaseConnectionManagerTest.java | 62 +++-
.../StandardDatabaseProxyConnectorTest.java | 316 ++++++++++++++++++++-
.../jdbc/statement/JDBCBackendStatementTest.java | 108 ++++++-
.../backend/session/ConnectionSessionTest.java | 24 ++
.../session/PreparedStatementCacheContextTest.java | 140 +++++++++
.../session/PreparedStatementCacheKeyTest.java | 40 +++
.../FirebirdStatementResourceCleaner.java | 79 ++++++
.../FirebirdExecuteStatementCommandExecutor.java | 44 +--
.../fetch/FirebirdFetchStatementCache.java | 13 +-
.../free/FirebirdFreeStatementCommandExecutor.java | 12 +-
.../FirebirdPrepareStatementCommandExecutor.java | 10 +-
.../FirebirdStatementResourceCleanerTest.java | 157 ++++++++++
...irebirdExecuteStatementCommandExecutorTest.java | 6 +
.../fetch/FirebirdFetchStatementCacheTest.java | 11 +
.../FirebirdFetchStatementCommandExecutorTest.java | 25 +-
.../FirebirdFreeStatementCommandExecutorTest.java | 53 ++--
...irebirdPrepareStatementCommandExecutorTest.java | 49 +++-
24 files changed, 1422 insertions(+), 72 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 38ae7ae3675..e910c843972 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -48,6 +48,7 @@
1. Sharding: Fix HASH_MOD routing mismatch for same negative numeric values
across numeric Java types with compatibility switch
`normalize-numeric-int-range` -
[#38327](https://github.com/apache/shardingsphere/pull/38327)
1. Proxy: Support non column projection for MySQL prepared statement in Proxy
- [#38507](https://github.com/apache/shardingsphere/pull/38507)
1. Proxy: Support driverClassName config in proxy storage unit to solve mysql
and mariadb jdbc url conflict -
[#38582](https://github.com/apache/shardingsphere/pull/38582)
+1. Proxy: Support Firebird prepared statement cache reuse for held connections
- [#38644](https://github.com/apache/shardingsphere/pull/38644)
## Release 5.5.3
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
index 68c7901dbc7..b65f2a70589 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
@@ -248,6 +248,16 @@ public final class ProxyDatabaseConnectionManager
implements DatabaseConnectionM
inUseProxyBackendHandlers.remove(handler);
}
+ /**
+ * Remove proxy backend handler resource.
+ *
+ * @param handler proxy backend handler to be removed
+ */
+ public void removeResource(final ProxyBackendHandler handler) {
+ proxyBackendHandlers.remove(handler);
+ inUseProxyBackendHandlers.remove(handler);
+ }
+
/**
* Handle auto commit.
*/
@@ -349,6 +359,7 @@ public final class ProxyDatabaseConnectionManager
implements DatabaseConnectionM
}
cachedConnections.clear();
}
+ connectionSession.getPreparedStatementCacheContext().closeAll();
if (!forceRollback) {
connectionPostProcessors.clear();
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
index 3ad376a8aae..3076ab1fded 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
@@ -85,6 +85,7 @@ import
org.apache.shardingsphere.transaction.api.TransactionType;
import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -393,6 +394,9 @@ public final class StandardDatabaseProxyConnector
implements DatabaseProxyConnec
private Collection<SQLException> closeStatements() {
Collection<SQLException> result = new LinkedList<>();
for (Statement each : cachedStatements) {
+ if (isCachedPreparedStatement(each)) {
+ continue;
+ }
try {
each.cancel();
each.close();
@@ -404,6 +408,10 @@ public final class StandardDatabaseProxyConnector
implements DatabaseProxyConnec
return result;
}
+ private boolean isCachedPreparedStatement(final Statement statement) {
+ return statement instanceof PreparedStatement &&
databaseConnectionManager.getConnectionSession().getPreparedStatementCacheContext().contains(statement);
+ }
+
private Optional<SQLException> closeSQLFederationEngine() {
if (null != proxySQLExecutor.getSqlFederationEngine()) {
try {
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
index 40a052e5148..36fc7af8580 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
@@ -24,6 +24,9 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheContext;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -33,12 +36,19 @@ import java.sql.Types;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Objects;
/**
* JDBC backend statement.
*/
public final class JDBCBackendStatement implements
ExecutorJDBCStatementManager {
+ private final ConnectionSession connectionSession;
+
+ public JDBCBackendStatement(final ConnectionSession connectionSession) {
+ this.connectionSession = Objects.requireNonNull(connectionSession,
"connectionSession cannot be null.");
+ }
+
@Override
public Statement createStorageResource(final Connection connection, final
ConnectionMode connectionMode, final StatementOption option, final DatabaseType
databaseType) throws SQLException {
Statement result = connection.createStatement();
@@ -53,9 +63,8 @@ public final class JDBCBackendStatement implements
ExecutorJDBCStatementManager
final ConnectionMode
connectionMode, final StatementOption option, final DatabaseType databaseType)
throws SQLException {
String sql = executionUnit.getSqlUnit().getSql();
List<Object> params = executionUnit.getSqlUnit().getParameters();
- PreparedStatement result = option.isReturnGeneratedKeys()
- ?
connection.prepareStatement(executionUnit.getSqlUnit().getSql(),
Statement.RETURN_GENERATED_KEYS)
- : connection.prepareStatement(sql);
+ PreparedStatement result = getPreparedStatement(connection, sql,
option.isReturnGeneratedKeys());
+ result.clearParameters();
Iterator<Object> paramIterator = params.iterator();
int index = 0;
while (paramIterator.hasNext()) {
@@ -73,6 +82,19 @@ public final class JDBCBackendStatement implements
ExecutorJDBCStatementManager
return result;
}
+ private PreparedStatement getPreparedStatement(final Connection
connection, final String sql, final boolean returnGeneratedKeys) throws
SQLException {
+ Optional<PreparedStatementCacheKey> preparedStatementCacheKey =
connectionSession.getCurrentPreparedStatementCacheKey();
+ if (!preparedStatementCacheKey.isPresent()) {
+ return createPreparedStatement(connection, sql,
returnGeneratedKeys);
+ }
+ PreparedStatementCacheContext cacheContext =
connectionSession.getPreparedStatementCacheContext();
+ return cacheContext.getOrCreate(connection, sql, returnGeneratedKeys,
preparedStatementCacheKey.get(), () -> createPreparedStatement(connection, sql,
returnGeneratedKeys));
+ }
+
+ private PreparedStatement createPreparedStatement(final Connection
connection, final String sql, final boolean returnGeneratedKeys) throws
SQLException {
+ return returnGeneratedKeys ? connection.prepareStatement(sql,
Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql);
+ }
+
private void setFetchSize(final Statement statement, final DatabaseType
databaseType) throws SQLException {
Optional<StatementMemoryStrictlyFetchSizeSetter> fetchSizeSetter =
DatabaseTypedSPILoader.findService(StatementMemoryStrictlyFetchSizeSetter.class,
databaseType);
if (fetchSizeSetter.isPresent()) {
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index d3a6e28989f..cc273f83486 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -65,10 +65,14 @@ public final class ConnectionSession {
@SuppressWarnings("rawtypes")
private final ExecutorStatementManager statementManager;
+ private final PreparedStatementCacheContext preparedStatementCacheContext
= new PreparedStatementCacheContext();
+
private final ServerPreparedStatementRegistry
serverPreparedStatementRegistry = new ServerPreparedStatementRegistry();
private final AtomicReference<ConnectionContext> connectionContext = new
AtomicReference<>();
+ private final AtomicReference<Optional<PreparedStatementCacheKey>>
currentPreparedStatementCacheKey = new AtomicReference<>(Optional.empty());
+
private final RequiredSessionVariableRecorder
requiredSessionVariableRecorder = new RequiredSessionVariableRecorder();
private volatile String processId;
@@ -80,7 +84,7 @@ public final class ConnectionSession {
transactionStatus = new TransactionStatus();
this.attributeMap = attributeMap;
databaseConnectionManager = new ProxyDatabaseConnectionManager(this);
- statementManager = new JDBCBackendStatement();
+ statementManager = new JDBCBackendStatement(this);
}
/**
@@ -131,6 +135,40 @@ public final class ConnectionSession {
return Optional.ofNullable(isolationLevel);
}
+ /**
+ * Begin prepared statement cache.
+ *
+ * @param cacheKey prepared statement cache key
+ */
+ public void beginPreparedStatementCache(final PreparedStatementCacheKey
cacheKey) {
+ currentPreparedStatementCacheKey.set(Optional.of(cacheKey));
+ }
+
+ /**
+ * Get current prepared statement cache key.
+ *
+ * @return current prepared statement cache key
+ */
+ public Optional<PreparedStatementCacheKey>
getCurrentPreparedStatementCacheKey() {
+ return currentPreparedStatementCacheKey.get();
+ }
+
+ /**
+ * Finish prepared statement cache.
+ */
+ public void finishPreparedStatementCache() {
+ currentPreparedStatementCacheKey.set(Optional.empty());
+ }
+
+ /**
+ * Invalidate prepared statement cache.
+ *
+ * @param cacheKey prepared statement cache key
+ */
+ public void invalidatePreparedStatementCache(final
PreparedStatementCacheKey cacheKey) {
+ preparedStatementCacheContext.invalidate(cacheKey);
+ }
+
/**
* Clear query context.
*/
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContext.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContext.java
new file mode 100644
index 00000000000..8704474a7c6
--- /dev/null
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContext.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import lombok.Getter;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Prepared statement cache context.
+ */
+public final class PreparedStatementCacheContext {
+
+ private static final int DEFAULT_MAX_SIZE = 128;
+
+ @Getter
+ private final int maxSize;
+
+ private final Map<CacheKey, PreparedStatement> cachedPreparedStatements =
new LinkedHashMap<>(16, 0.75F, true);
+
+ public PreparedStatementCacheContext() {
+ this(DEFAULT_MAX_SIZE);
+ }
+
+ public PreparedStatementCacheContext(final int maxSize) {
+ this.maxSize = Math.max(1, maxSize);
+ }
+
+ /**
+ * Get or create prepared statement.
+ *
+ * @param connection connection
+ * @param sql SQL
+ * @param returnGeneratedKeys return generated keys flag
+ * @param preparedStatementCacheKey prepared statement cache key
+ * @param supplier prepared statement supplier
+ * @return prepared statement
+ * @throws SQLException SQL exception
+ */
+ public synchronized PreparedStatement getOrCreate(final Connection
connection, final String sql, final boolean returnGeneratedKeys,
+ final
PreparedStatementCacheKey preparedStatementCacheKey, final
PreparedStatementSupplier supplier) throws SQLException {
+ CacheKey cacheKey = new CacheKey(connection, sql, returnGeneratedKeys,
preparedStatementCacheKey);
+ PreparedStatement cachedPreparedStatement =
cachedPreparedStatements.get(cacheKey);
+ if (null != cachedPreparedStatement &&
!isClosed(cachedPreparedStatement)) {
+ return cachedPreparedStatement;
+ }
+ if (null != cachedPreparedStatement) {
+ cachedPreparedStatements.remove(cacheKey);
+ closeQuietly(cachedPreparedStatement);
+ }
+ PreparedStatement result = supplier.get();
+ cachedPreparedStatements.put(cacheKey, result);
+ evictIfNecessary();
+ return result;
+ }
+
+ /**
+ * Check whether statement is cached.
+ *
+ * @param statement statement
+ * @return cached or not
+ */
+ public synchronized boolean contains(final Statement statement) {
+ return cachedPreparedStatements.containsValue(statement);
+ }
+
+ /**
+ * Invalidate cached statement.
+ *
+ * @param statement statement
+ */
+ public synchronized void invalidate(final Statement statement) {
+ Iterator<Entry<CacheKey, PreparedStatement>> iterator =
cachedPreparedStatements.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<CacheKey, PreparedStatement> entry = iterator.next();
+ if (entry.getValue() == statement) {
+ iterator.remove();
+ closeQuietly(entry.getValue());
+ break;
+ }
+ }
+ }
+
+ /**
+ * Invalidate cached statements by prepared statement cache key.
+ *
+ * @param preparedStatementCacheKey prepared statement cache key
+ */
+ public synchronized void invalidate(final PreparedStatementCacheKey
preparedStatementCacheKey) {
+ Iterator<Entry<CacheKey, PreparedStatement>> iterator =
cachedPreparedStatements.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<CacheKey, PreparedStatement> entry = iterator.next();
+ if
(preparedStatementCacheKey.equals(entry.getKey().preparedStatementCacheKey)) {
+ iterator.remove();
+ closeQuietly(entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Close all cached statements.
+ */
+ public synchronized void closeAll() {
+ for (PreparedStatement each : cachedPreparedStatements.values()) {
+ closeQuietly(each);
+ }
+ cachedPreparedStatements.clear();
+ }
+
+ /**
+ * Get cache size.
+ *
+ * @return cache size
+ */
+ public synchronized int size() {
+ return cachedPreparedStatements.size();
+ }
+
+ private void evictIfNecessary() {
+ while (cachedPreparedStatements.size() > maxSize) {
+ Iterator<Entry<CacheKey, PreparedStatement>> iterator =
cachedPreparedStatements.entrySet().iterator();
+ if (!iterator.hasNext()) {
+ break;
+ }
+ Entry<CacheKey, PreparedStatement> eldest = iterator.next();
+ iterator.remove();
+ closeQuietly(eldest.getValue());
+ }
+ }
+
+ private boolean isClosed(final PreparedStatement preparedStatement) {
+ try {
+ return preparedStatement.isClosed();
+ } catch (final SQLException ignored) {
+ return true;
+ }
+ }
+
+ private void closeQuietly(final PreparedStatement preparedStatement) {
+ try {
+ preparedStatement.close();
+ } catch (final SQLException ignored) {
+ }
+ }
+
+ @FunctionalInterface
+ public interface PreparedStatementSupplier {
+
+ /**
+ * Get prepared statement.
+ *
+ * @return prepared statement
+ * @throws SQLException SQL exception
+ */
+ PreparedStatement get() throws SQLException;
+ }
+
+ private static final class CacheKey {
+
+ private final Connection connection;
+
+ private final String sql;
+
+ private final boolean returnGeneratedKeys;
+
+ private final PreparedStatementCacheKey preparedStatementCacheKey;
+
+ private final int connectionIdentityHash;
+
+ private CacheKey(final Connection connection, final String sql, final
boolean returnGeneratedKeys, final PreparedStatementCacheKey
preparedStatementCacheKey) {
+ this.connection = connection;
+ this.sql = sql;
+ this.returnGeneratedKeys = returnGeneratedKeys;
+ this.preparedStatementCacheKey =
Objects.requireNonNull(preparedStatementCacheKey, "preparedStatementCacheKey
cannot be null.");
+ connectionIdentityHash = System.identityHashCode(connection);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (!(obj instanceof CacheKey)) {
+ return false;
+ }
+ CacheKey other = (CacheKey) obj;
+ return connection == other.connection && returnGeneratedKeys ==
other.returnGeneratedKeys && sql.equals(other.sql)
+ &&
preparedStatementCacheKey.equals(other.preparedStatementCacheKey);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = connectionIdentityHash;
+ result = 31 * result + sql.hashCode();
+ result = 31 * result + Boolean.hashCode(returnGeneratedKeys);
+ result = 31 * result + preparedStatementCacheKey.hashCode();
+ return result;
+ }
+ }
+}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKey.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKey.java
new file mode 100644
index 00000000000..5f9b4ed0805
--- /dev/null
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKey.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.util.Objects;
+
+/**
+ * Prepared statement cache key.
+ */
+@Getter
+@EqualsAndHashCode
+public final class PreparedStatementCacheKey {
+
+ private final String token;
+
+ public PreparedStatementCacheKey(final String token) {
+ this.token = Objects.requireNonNull(token, "token cannot be null.");
+ }
+}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
index 30726adfb8c..d774d3c825e 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
@@ -22,7 +22,10 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -43,6 +46,8 @@ import
org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheContext;
import
org.apache.shardingsphere.proxy.backend.session.RequiredSessionVariableRecorder;
import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
import
org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel;
@@ -67,6 +72,7 @@ import org.mockito.quality.Strictness;
import java.lang.reflect.Field;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -95,6 +101,7 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -125,9 +132,10 @@ class ProxyDatabaseConnectionManagerTest {
when(connectionSession.getConnectionContext().getTransactionContext()).thenReturn(new
TransactionConnectionContext());
when(connectionSession.getTransactionStatus()).thenReturn(new
TransactionStatus());
when(connectionSession.getUsedDatabaseName()).thenReturn(String.format(SCHEMA_PATTERN,
0));
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.empty());
databaseConnectionManager = new
ProxyDatabaseConnectionManager(connectionSession);
when(connectionSession.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager);
- JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
when(connectionSession.getRequiredSessionVariableRecorder()).thenReturn(new
RequiredSessionVariableRecorder());
}
@@ -261,6 +269,18 @@ class ProxyDatabaseConnectionManagerTest {
verifyConnectionPostProcessorsEmpty();
}
+ @Test
+ void assertCloseConnectionsClosesPreparedStatementCache() throws
SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ Connection connection = prepareCachedConnections();
+ cacheContext.getOrCreate(connection, "SELECT 1", false, new
PreparedStatementCacheKey("statement-1"), () -> preparedStatement);
+
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+ databaseConnectionManager.closeConnections(false);
+ verify(preparedStatement).close();
+ assertThat(cacheContext.size(), is(0));
+ }
+
@SuppressWarnings("unchecked")
@SneakyThrows(ReflectiveOperationException.class)
private void verifyConnectionPostProcessorsEmpty() {
@@ -444,6 +464,18 @@ class ProxyDatabaseConnectionManagerTest {
assertTrue(actual.isEmpty());
}
+ @Test
+ void assertRemoveDatabaseProxyConnectorResource() {
+ ProxyBackendHandler engine = mock(DatabaseProxyConnector.class);
+ Collection<ProxyBackendHandler> backendHandlers =
getProxyBackendHandlers();
+ Collection<ProxyBackendHandler> inUseProxyBackendHandlers =
getInUseProxyBackendHandlers();
+ backendHandlers.add(engine);
+ inUseProxyBackendHandlers.add(engine);
+ databaseConnectionManager.removeResource(engine);
+ assertFalse(backendHandlers.contains(engine));
+ assertFalse(inUseProxyBackendHandlers.contains(engine));
+ }
+
@Test
void assertCloseHandlers() throws SQLException {
ProxyBackendHandler engine = mock(DatabaseProxyConnector.class);
@@ -496,6 +528,34 @@ class ProxyDatabaseConnectionManagerTest {
assertThat(getInUseProxyBackendHandlers(),
is(Collections.singleton(inUseHandler)));
}
+ @Test
+ void
assertCloseExecutionResourcesKeepsFirebirdPreparedStatementReuseInHeldTransaction()
throws SQLException, BackendConnectionException {
+ connectionSession.getTransactionStatus().setInTransaction(true);
+
connectionSession.getConnectionContext().getTransactionContext().beginTransaction(TransactionType.XA.name(),
null);
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.of(new
PreparedStatementCacheKey("statement-1")));
+ DatabaseType firebirdDatabaseType = mock(DatabaseType.class);
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
+ Connection cachedConnection = prepareCachedConnections();
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ when(cachedConnection.prepareStatement("SELECT
1")).thenReturn(preparedStatement);
+ StatementOption statementOption = new StatementOption(false);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit("SELECT 1",
Collections.singletonList(1))),
+ cachedConnection, 0, ConnectionMode.CONNECTION_STRICTLY,
statementOption, firebirdDatabaseType);
+ databaseConnectionManager.closeExecutionResources();
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit("SELECT 1",
Collections.singletonList(2))),
+ cachedConnection, 0, ConnectionMode.CONNECTION_STRICTLY,
statementOption, firebirdDatabaseType);
+ verify(cachedConnection, times(1)).prepareStatement("SELECT 1");
+ verify(preparedStatement, times(2)).clearParameters();
+ verify(preparedStatement).setObject(1, 1);
+ verify(preparedStatement).setObject(1, 2);
+ verify(cachedConnection, never()).close();
+ assertThat(cacheContext.size(), is(1));
+ }
+
@Test
void assertCloseExecutionResourcesInTransactionWhenClosed() throws
BackendConnectionException, SQLException {
connectionSession.getTransactionStatus().setInTransaction(true);
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
index d7e2c9e0ad9..446ff4fd765 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
@@ -32,12 +32,15 @@ import
org.apache.shardingsphere.infra.binder.context.statement.type.ddl.CursorH
import
org.apache.shardingsphere.infra.binder.context.statement.type.ddl.CursorStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import
org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.EmptyStorageUnitException;
import
org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
@@ -49,6 +52,7 @@ import
org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryMergedResu
import
org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryQueryResultRow;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
@@ -72,15 +76,20 @@ import
org.apache.shardingsphere.mode.metadata.refresher.pushdown.PushDownMetaDa
import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import
org.apache.shardingsphere.proxy.backend.connector.jdbc.fixture.QueryHeaderBuilderFixture;
+import
org.apache.shardingsphere.proxy.backend.connector.jdbc.datasource.JDBCBackendDataSource;
+import
org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
import
org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
import
org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.ProxyBackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilder;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sql.parser.engine.api.CacheOption;
import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.cursor.CursorNameSegment;
@@ -95,6 +104,7 @@ import
org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
import org.junit.jupiter.api.AfterEach;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -107,6 +117,8 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -118,6 +130,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -128,14 +141,19 @@ import static
org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -146,6 +164,8 @@ class StandardDatabaseProxyConnectorTest {
private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+ private final DatabaseType firebirdDatabaseType =
TypedSPILoader.getService(DatabaseType.class, "Firebird");
+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ProxyDatabaseConnectionManager databaseConnectionManager;
@@ -497,6 +517,184 @@ class StandardDatabaseProxyConnectorTest {
}
}
+ @Test
+ void assertExecuteWithFirebirdPreparedStatementReuseInHeldConnection()
throws SQLException, BackendConnectionException {
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ FirebirdPreparedStatementExecutionTestContext testContext =
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?",
connection);
+ when(connection.prepareStatement("UPDATE tbl SET col =
?")).thenReturn(preparedStatement);
+ ExecutionContext firstExecutionContext =
createFirebirdExecutionContext(testContext, 1);
+ ExecutionContext secondExecutionContext =
createFirebirdExecutionContext(testContext, 2);
+ DialectDatabaseMetaData dialectDatabaseMetaData =
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+ try (
+ MockedConstruction<DatabaseTypeRegistry>
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+ (mock, context) -> {
+
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+ });
+ MockedConstruction<KernelProcessor> mockedKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (mock, context) ->
when(mock.generateExecutionContext(any(QueryContext.class),
any(RuleMetaData.class), any(ConfigurationProperties.class)))
+ .thenReturn(1 == context.getCount() ?
firstExecutionContext : secondExecutionContext));
+ MockedStatic<ShardingSphereServiceLoader> serviceLoader =
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+ DatabaseProxyConnector engine =
createFirebirdPreparedStatementConnector(testContext);
+ serviceLoader.when(() ->
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(1));
+ testContext.connectionSession.finishPreparedStatementCache();
+
testContext.connectionSession.getDatabaseConnectionManager().closeExecutionResources();
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(1));
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+ testContext.connectionSession.finishPreparedStatementCache();
+ assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 3);
+ assertThat(mockedKernelProcessor.constructed().size(), is(2));
+ }
+ verify(connection).prepareStatement("UPDATE tbl SET col = ?");
+ verify(preparedStatement, times(2)).clearParameters();
+ verify(preparedStatement).setObject(1, 1);
+ verify(preparedStatement).setObject(1, 2);
+ verify(connection, never()).close();
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(1));
+ }
+
+ @Test
+ void
assertExecuteWithNewPreparedStatementAfterFirebirdPreparedStatementInvalidation()
throws SQLException, BackendConnectionException {
+ Connection connection = mock(Connection.class);
+ PreparedStatement firstPreparedStatement =
mock(PreparedStatement.class);
+ PreparedStatement secondPreparedStatement =
mock(PreparedStatement.class);
+ FirebirdPreparedStatementExecutionTestContext testContext =
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?",
connection);
+ when(connection.prepareStatement("UPDATE tbl SET col =
?")).thenReturn(firstPreparedStatement, secondPreparedStatement);
+ ExecutionContext firstExecutionContext =
createFirebirdExecutionContext(testContext, 1);
+ ExecutionContext secondExecutionContext =
createFirebirdExecutionContext(testContext, 2);
+ DialectDatabaseMetaData dialectDatabaseMetaData =
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+ try (
+ MockedConstruction<DatabaseTypeRegistry>
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+ (mock, context) -> {
+
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+ });
+ MockedConstruction<KernelProcessor> mockedKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (mock, context) ->
when(mock.generateExecutionContext(any(QueryContext.class),
any(RuleMetaData.class), any(ConfigurationProperties.class)))
+ .thenReturn(1 == context.getCount() ?
firstExecutionContext : secondExecutionContext));
+ MockedStatic<ShardingSphereServiceLoader> serviceLoader =
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+ DatabaseProxyConnector engine =
createFirebirdPreparedStatementConnector(testContext);
+ serviceLoader.when(() ->
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+ testContext.connectionSession.finishPreparedStatementCache();
+
testContext.connectionSession.invalidatePreparedStatementCache(createPreparedStatementCacheKey(1));
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(0));
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+ testContext.connectionSession.finishPreparedStatementCache();
+ assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 3);
+ assertThat(mockedKernelProcessor.constructed().size(), is(2));
+ }
+ verify(connection, times(2)).prepareStatement("UPDATE tbl SET col =
?");
+ verify(firstPreparedStatement).clearParameters();
+ verify(secondPreparedStatement).clearParameters();
+ verify(firstPreparedStatement).setObject(1, 1);
+ verify(secondPreparedStatement).setObject(1, 2);
+ verify(firstPreparedStatement).close();
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(1));
+ }
+
+ @Test
+ void assertExecuteWithNewPreparedStatementAfterClosingConnections() throws
SQLException, BackendConnectionException {
+ Connection firstConnection = mock(Connection.class);
+ Connection secondConnection = mock(Connection.class);
+ PreparedStatement firstPreparedStatement =
mock(PreparedStatement.class);
+ PreparedStatement secondPreparedStatement =
mock(PreparedStatement.class);
+ FirebirdPreparedStatementExecutionTestContext testContext =
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?",
firstConnection, secondConnection);
+ when(firstConnection.prepareStatement("UPDATE tbl SET col =
?")).thenReturn(firstPreparedStatement);
+ when(secondConnection.prepareStatement("UPDATE tbl SET col =
?")).thenReturn(secondPreparedStatement);
+ ExecutionContext firstExecutionContext =
createFirebirdExecutionContext(testContext, 1);
+ ExecutionContext secondExecutionContext =
createFirebirdExecutionContext(testContext, 2);
+ DialectDatabaseMetaData dialectDatabaseMetaData =
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+ try (
+ MockedConstruction<DatabaseTypeRegistry>
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+ (mock, context) -> {
+
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+ });
+ MockedConstruction<KernelProcessor> mockedKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (mock, context) ->
when(mock.generateExecutionContext(any(QueryContext.class),
any(RuleMetaData.class), any(ConfigurationProperties.class)))
+ .thenReturn(1 == context.getCount() ?
firstExecutionContext : secondExecutionContext));
+ MockedStatic<ShardingSphereServiceLoader> serviceLoader =
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+ DatabaseProxyConnector engine =
createFirebirdPreparedStatementConnector(testContext);
+ serviceLoader.when(() ->
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+ testContext.connectionSession.finishPreparedStatementCache();
+
assertThat(testContext.connectionSession.getDatabaseConnectionManager().closeConnections(false),
is(Collections.emptyList()));
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(0));
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+ testContext.connectionSession.finishPreparedStatementCache();
+ assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 3);
+ assertThat(mockedKernelProcessor.constructed().size(), is(2));
+ }
+ verify(firstConnection).prepareStatement("UPDATE tbl SET col = ?");
+ verify(secondConnection).prepareStatement("UPDATE tbl SET col = ?");
+ verify(firstPreparedStatement).clearParameters();
+ verify(secondPreparedStatement).clearParameters();
+ verify(firstPreparedStatement).setObject(1, 1);
+ verify(secondPreparedStatement).setObject(1, 2);
+ verify(firstPreparedStatement).close();
+ verify(firstConnection).close();
+
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
is(1));
+ }
+
+ @Test
+ void
assertCloseExecutionResourcesAfterHandlerCleanupAndCacheInvalidation() throws
SQLException, BackendConnectionException {
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ AtomicBoolean closed = new AtomicBoolean(false);
+ doAnswer(invocation -> {
+ if (closed.get()) {
+ throw new SQLException("cancel after close");
+ }
+ return null;
+ }).when(preparedStatement).cancel();
+ doAnswer(invocation -> {
+ closed.set(true);
+ return null;
+ }).when(preparedStatement).close();
+ when(preparedStatement.isClosed()).thenAnswer(invocation ->
closed.get());
+ FirebirdPreparedStatementExecutionTestContext testContext =
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?",
connection);
+ when(connection.prepareStatement("UPDATE tbl SET col =
?")).thenReturn(preparedStatement);
+ ExecutionContext executionContext =
createFirebirdExecutionContext(testContext, 1);
+ DialectDatabaseMetaData dialectDatabaseMetaData =
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+ try (
+ MockedConstruction<DatabaseTypeRegistry>
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+ (mock, context) -> {
+
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+ });
+ MockedConstruction<KernelProcessor> mockedKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (mock, context) ->
when(mock.generateExecutionContext(any(QueryContext.class),
any(RuleMetaData.class),
any(ConfigurationProperties.class))).thenReturn(executionContext));
+ MockedStatic<ShardingSphereServiceLoader> serviceLoader =
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+ DatabaseProxyConnector engine =
createFirebirdPreparedStatementConnector(testContext);
+
testContext.connectionSession.getDatabaseConnectionManager().add(engine);
+
testContext.connectionSession.getDatabaseConnectionManager().markResourceInUse(engine);
+ serviceLoader.when(() ->
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+ assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+ testContext.connectionSession.finishPreparedStatementCache();
+
testContext.connectionSession.getDatabaseConnectionManager().removeResource(engine);
+ engine.close();
+
testContext.connectionSession.invalidatePreparedStatementCache(createPreparedStatementCacheKey(1));
+
testContext.connectionSession.getDatabaseConnectionManager().closeExecutionResources();
+ assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 2);
+ assertThat(mockedKernelProcessor.constructed().size(), is(1));
+ }
+ verify(connection).prepareStatement("UPDATE tbl SET col = ?");
+ verify(preparedStatement, never()).cancel();
+ verify(preparedStatement).close();
+ assertTrue(closed.get());
+ }
+
@Test
void assertExecuteWithQueryResult() throws SQLException {
SQLStatementContext sqlStatementContext =
createSQLStatementContext(new SQLStatement(databaseType));
@@ -727,10 +925,10 @@ class StandardDatabaseProxyConnectorTest {
}
private ShardingSphereResultSetMetaData createResultSetMetaData() throws
SQLException {
- ShardingSphereResultSetMetaData result =
mock(ShardingSphereResultSetMetaData.class);
- when(result.getColumnLabel(1)).thenReturn("order_id");
- when(result.getColumnName(1)).thenReturn("order_id");
- return result;
+ ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
+ when(resultSetMetaData.getColumnLabel(1)).thenReturn("order_id");
+ when(resultSetMetaData.getColumnName(1)).thenReturn("order_id");
+ return new ShardingSphereResultSetMetaData(resultSetMetaData,
mock(ShardingSphereDatabase.class), null);
}
private ResponseHeader executeWithImplicitCommitCondition(final
SQLStatement sqlStatement, final String transactionType, final boolean
inTransaction,
@@ -803,6 +1001,22 @@ class StandardDatabaseProxyConnectorTest {
assertTrue(cachedStatements.isEmpty());
}
+ @Test
+ void assertCloseSkipCachedPreparedStatement() throws SQLException {
+ SQLStatementContext sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getTablesContext().getDatabaseNames()).thenReturn(Collections.emptyList());
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+ DatabaseProxyConnector engine =
createDatabaseProxyConnector(JDBCDriverType.STATEMENT,
createQueryContext(sqlStatementContext, mockDatabase()));
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ Collection<Statement> cachedStatements = getField(engine,
"cachedStatements");
+ cachedStatements.add(preparedStatement);
+
when(databaseConnectionManager.getConnectionSession().getPreparedStatementCacheContext().contains(preparedStatement)).thenReturn(true);
+ engine.close();
+ verify(preparedStatement, never()).cancel();
+ verify(preparedStatement, never()).close();
+ assertTrue(cachedStatements.isEmpty());
+ }
+
@Test
void assertCloseResultSetsWithExceptionThrown() throws SQLException {
SQLStatementContext sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
@@ -899,6 +1113,73 @@ class StandardDatabaseProxyConnectorTest {
return result;
}
+ private ConnectionSession createFirebirdConnectionSession(final
ContextManager contextManager) {
+
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+ ConnectionSession result = new ConnectionSession(firebirdDatabaseType,
null);
+ result.setGrantee(new Grantee("foo_user"));
+ result.setCurrentDatabaseName("foo_db");
+
result.getConnectionContext().getTransactionContext().beginTransaction("XA",
null);
+ result.getTransactionStatus().setInTransaction(true);
+ return result;
+ }
+
+ private FirebirdPreparedStatementExecutionTestContext
createFirebirdPreparedStatementExecutionTestContext(final String sql, final
Connection... connections) throws SQLException {
+ ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
+ ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class,
RETURNS_DEEP_STUBS);
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class,
RETURNS_DEEP_STUBS);
+ StorageUnit storageUnit = mock(StorageUnit.class);
+ DatabaseType storageDatabaseType = mock(DatabaseType.class);
+ JDBCBackendDataSource backendDataSource =
mock(JDBCBackendDataSource.class);
+ when(metaData.containsDatabase("foo_db")).thenReturn(true);
+ when(metaData.getDatabase("foo_db")).thenReturn(database);
+
when(metaData.getAllDatabases()).thenReturn(Collections.singleton(database));
+
when(metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE)).thenReturn(1);
+
when(metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY)).thenReturn(1);
+ when(metaData.getGlobalRuleMetaData()).thenReturn(new
RuleMetaData(Collections.singletonList(sqlFederationRule)));
+ when(database.getName()).thenReturn("foo_db");
+ when(database.containsDataSource()).thenReturn(true);
+ when(database.isComplete()).thenReturn(true);
+ when(database.getProtocolType()).thenReturn(firebirdDatabaseType);
+
when(database.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
+
when(database.getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)).thenReturn(Collections.emptyList());
+
when(database.getResourceMetaData().getStorageUnits()).thenReturn(Collections.singletonMap("ds",
storageUnit));
+ when(storageDatabaseType.getType()).thenReturn("Firebird");
+ when(storageUnit.getStorageType()).thenReturn(storageDatabaseType);
+
when(contextManager.getMetaDataContexts().getMetaData()).thenReturn(metaData);
+ when(contextManager.getDatabase("foo_db")).thenReturn(database);
+
when(ProxyContext.getInstance().getBackendDataSource()).thenReturn(backendDataSource);
+ if (1 == connections.length) {
+ when(backendDataSource.getConnections(eq("foo_db"), eq("ds"),
eq(1), any())).thenReturn(Collections.singletonList(connections[0]));
+ } else {
+ when(backendDataSource.getConnections(eq("foo_db"), eq("ds"),
eq(1), any())).thenReturn(Collections.singletonList(connections[0]),
Collections.singletonList(connections[1]));
+ }
+ ConnectionSession connectionSession =
createFirebirdConnectionSession(contextManager);
+ SQLStatementContext sqlStatementContext =
createSQLStatementContext(new SQLStatement(firebirdDatabaseType));
+ return new
FirebirdPreparedStatementExecutionTestContext(connectionSession, metaData,
sqlStatementContext,
+ new QueryContext(sqlStatementContext, sql,
Collections.singletonList(1), new HintValueContext(),
connectionSession.getConnectionContext(), metaData), sql);
+ }
+
+ private ExecutionContext createFirebirdExecutionContext(final
FirebirdPreparedStatementExecutionTestContext testContext, final int parameter)
{
+ return new ExecutionContext(new
QueryContext(testContext.sqlStatementContext, testContext.sql,
Collections.singletonList(parameter),
+ new HintValueContext(),
testContext.connectionSession.getConnectionContext(), testContext.metaData),
+ Collections.singletonList(new ExecutionUnit("ds", new
SQLUnit(testContext.sql, Collections.singletonList(parameter)))), mock());
+ }
+
+ private PreparedStatementCacheKey createPreparedStatementCacheKey(final
int statementId) {
+ return new PreparedStatementCacheKey("firebird:" + statementId);
+ }
+
+ private DatabaseProxyConnector
createFirebirdPreparedStatementConnector(final
FirebirdPreparedStatementExecutionTestContext testContext) throws SQLException {
+ DatabaseProxyConnector result = new StandardDatabaseProxyConnector(
+ JDBCDriverType.PREPARED_STATEMENT, testContext.queryContext,
testContext.connectionSession.getDatabaseConnectionManager());
+ ProxySQLExecutor proxySQLExecutor = getField(result,
"proxySQLExecutor");
+ ProxyJDBCExecutor proxyJDBCExecutor = mock(ProxyJDBCExecutor.class);
+ List<ExecuteResult> executeResults = Collections.singletonList(new
UpdateResult(1, 0L));
+ when(proxyJDBCExecutor.execute(any(), any(), eq(false),
anyBoolean())).thenReturn(executeResults);
+ setProxySQLExecutorField(proxySQLExecutor, "regularExecutor",
proxyJDBCExecutor);
+ return result;
+ }
+
@SuppressWarnings("unchecked")
@SneakyThrows(ReflectiveOperationException.class)
private <T> T getField(final DatabaseProxyConnector target, final String
fieldName) {
@@ -909,4 +1190,31 @@ class StandardDatabaseProxyConnectorTest {
private void setField(final DatabaseProxyConnector target, final String
fieldName, final Object value) {
Plugins.getMemberAccessor().set(StandardDatabaseProxyConnector.class.getDeclaredField(fieldName),
target, value);
}
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void setProxySQLExecutorField(final ProxySQLExecutor target, final
String fieldName, final Object value) {
+
Plugins.getMemberAccessor().set(ProxySQLExecutor.class.getDeclaredField(fieldName),
target, value);
+ }
+
+ private static final class FirebirdPreparedStatementExecutionTestContext {
+
+ private final ConnectionSession connectionSession;
+
+ private final ShardingSphereMetaData metaData;
+
+ private final SQLStatementContext sqlStatementContext;
+
+ private final QueryContext queryContext;
+
+ private final String sql;
+
+ private FirebirdPreparedStatementExecutionTestContext(final
ConnectionSession connectionSession, final ShardingSphereMetaData metaData,
+ final
SQLStatementContext sqlStatementContext, final QueryContext queryContext, final
String sql) {
+ this.connectionSession = connectionSession;
+ this.metaData = metaData;
+ this.sqlStatementContext = sqlStatementContext;
+ this.queryContext = queryContext;
+ this.sql = sql;
+ }
+ }
}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
index dbc5c999342..083d425c612 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
@@ -25,10 +25,16 @@ import
org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheContext;
import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -38,10 +44,12 @@ import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
+import java.util.stream.Stream;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -54,7 +62,8 @@ class JDBCBackendStatementTest {
@Test
void
assertCreateStorageResourceWithStatementSetsFetchSizeOnlyInMemoryStrictly()
throws SQLException {
- JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
Connection connection = mock(Connection.class);
Statement statement = mock(Statement.class);
when(connection.createStatement()).thenReturn(statement);
@@ -70,7 +79,9 @@ class JDBCBackendStatementTest {
@Test
void assertCreateStorageResourceWithPreparedStatement() throws
SQLException {
- JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.empty());
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
Connection connection = mock(Connection.class);
PreparedStatement preparedStatementWithGeneratedKeys =
mock(PreparedStatement.class);
PreparedStatement preparedStatement = mock(PreparedStatement.class);
@@ -90,4 +101,95 @@ class JDBCBackendStatementTest {
new ExecutionUnit("ds", new SQLUnit(sql,
Collections.emptyList())), connection, 0, ConnectionMode.CONNECTION_STRICTLY,
new StatementOption(false), databaseType);
assertThat(actualWithoutGeneratedKeys, is(preparedStatement));
}
+
+ @Test
+ void
assertCreateStorageResourceReusesPreparedStatementForSamePreparedStatementCacheKey()
throws SQLException {
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.of(new
PreparedStatementCacheKey("statement-1")));
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ String sql = "SELECT * FROM foo WHERE id = ?";
+ when(connection.prepareStatement(sql)).thenReturn(preparedStatement);
+ StatementOption statementOption = new StatementOption(false);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(1))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(2))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+ verify(connection).prepareStatement(sql);
+ verify(preparedStatement, times(2)).clearParameters();
+ verify(preparedStatement).setObject(1, 1);
+ verify(preparedStatement).setObject(1, 2);
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("nonFirebirdDatabaseTypes")
+ void
assertCreateStorageResourceWithoutPreparedStatementCacheWithoutActiveKey(final
String scenario, final String databaseTypeName) throws SQLException {
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.empty());
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ String sql = "SELECT * FROM foo WHERE id = ?";
+ DatabaseType nonFirebirdDatabaseType = mock(DatabaseType.class);
+ when(connection.prepareStatement(sql)).thenReturn(preparedStatement);
+ StatementOption statementOption = new StatementOption(false);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(1))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, nonFirebirdDatabaseType);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(2))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, nonFirebirdDatabaseType);
+ verify(connection, times(2)).prepareStatement(sql);
+ }
+
+ @Test
+ void assertCreateStorageResourceWithDifferentPreparedStatementCacheKey()
throws SQLException {
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ String sql = "SELECT * FROM foo WHERE id = ?";
+ when(connection.prepareStatement(sql)).thenReturn(preparedStatement);
+ StatementOption statementOption = new StatementOption(false);
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(
+ Optional.of(new PreparedStatementCacheKey("statement-1")),
Optional.of(new PreparedStatementCacheKey("statement-2")));
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(1))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(2))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+ verify(connection, times(2)).prepareStatement(sql);
+ }
+
+ @Test
+ void assertCreateNewPreparedStatementAfterCacheInvalidation() throws
SQLException {
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ PreparedStatementCacheKey preparedStatementCacheKey = new
PreparedStatementCacheKey("statement-1");
+
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.of(preparedStatementCacheKey));
+ JDBCBackendStatement backendStatement = new
JDBCBackendStatement(connectionSession);
+ Connection connection = mock(Connection.class);
+ PreparedStatement firstPreparedStatement =
mock(PreparedStatement.class);
+ PreparedStatement secondPreparedStatement =
mock(PreparedStatement.class);
+ String sql = "SELECT * FROM foo WHERE id = ?";
+
when(connection.prepareStatement(sql)).thenReturn(firstPreparedStatement,
secondPreparedStatement);
+ StatementOption statementOption = new StatementOption(false);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(1))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+ cacheContext.invalidate(preparedStatementCacheKey);
+ backendStatement.createStorageResource(
+ new ExecutionUnit("ds", new SQLUnit(sql,
Collections.singletonList(2))), connection, 0,
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+ verify(connection, times(2)).prepareStatement(sql);
+ verify(firstPreparedStatement).close();
+ }
+
+ private static Stream<Arguments> nonFirebirdDatabaseTypes() {
+ return Stream.of(
+
Arguments.of("createStorageResource_mysqlDoesNotUsePreparedStatementCache",
"MySQL"),
+
Arguments.of("createStorageResource_postgresqlDoesNotUsePreparedStatementCache",
"PostgreSQL"),
+
Arguments.of("createStorageResource_openGaussDoesNotUsePreparedStatementCache",
"openGauss"));
+ }
}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
index 4a9fb7bc15f..6f52a4613ea 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
@@ -36,7 +36,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.internal.configuration.plugins.Plugins;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.is;
@@ -164,6 +168,26 @@ class ConnectionSessionTest {
assertNull(connectionSession.getQueryContext());
}
+ @Test
+ void assertPreparedStatementCacheContext() {
+ assertNotNull(connectionSession.getPreparedStatementCacheContext());
+ }
+
+ @Test
+ void assertPreparedStatementCacheLifecycle() throws SQLException {
+ PreparedStatementCacheKey preparedStatementCacheKey = new
PreparedStatementCacheKey("statement-1");
+
connectionSession.beginPreparedStatementCache(preparedStatementCacheKey);
+ assertThat(connectionSession.getCurrentPreparedStatementCacheKey(),
is(Optional.of(preparedStatementCacheKey)));
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+
connectionSession.getPreparedStatementCacheContext().getOrCreate(connection,
"SELECT 1", false, preparedStatementCacheKey, () -> preparedStatement);
+
connectionSession.invalidatePreparedStatementCache(preparedStatementCacheKey);
+
assertFalse(connectionSession.getPreparedStatementCacheContext().contains(preparedStatement));
+ connectionSession.finishPreparedStatementCache();
+ assertThat(connectionSession.getCurrentPreparedStatementCacheKey(),
is(Optional.empty()));
+ verify(preparedStatement).close();
+ }
+
@SuppressWarnings("unchecked")
private AtomicReference<ConnectionContext> getConnectionContextReference()
throws ReflectiveOperationException {
return (AtomicReference<ConnectionContext>)
Plugins.getMemberAccessor().get(ConnectionSession.class.getDeclaredField("connectionContext"),
connectionSession);
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContextTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContextTest.java
new file mode 100644
index 00000000000..219ff9f1aae
--- /dev/null
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContextTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class PreparedStatementCacheContextTest {
+
+ @Test
+ void assertGetOrCreateWithSamePreparedStatementCacheKey() throws
SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ PreparedStatementCacheKey preparedStatementCacheKey = new
PreparedStatementCacheKey("statement-1");
+ int[] invokedCount = {0};
+ PreparedStatement actualFirst = cacheContext.getOrCreate(connection,
"SELECT 1", false, preparedStatementCacheKey, () -> {
+ invokedCount[0]++;
+ return preparedStatement;
+ });
+ PreparedStatement actualSecond = cacheContext.getOrCreate(connection,
"SELECT 1", false, preparedStatementCacheKey, () -> {
+ invokedCount[0]++;
+ return mock(PreparedStatement.class);
+ });
+ assertThat(actualFirst, is(preparedStatement));
+ assertThat(actualSecond, is(preparedStatement));
+ assertThat(invokedCount[0], is(1));
+ assertTrue(cacheContext.contains(preparedStatement));
+ }
+
+ @Test
+ void assertGetOrCreateWithDifferentPreparedStatementCacheKey() throws
SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ Connection connection = mock(Connection.class);
+ PreparedStatement firstPreparedStatement =
mock(PreparedStatement.class);
+ PreparedStatement secondPreparedStatement =
mock(PreparedStatement.class);
+ PreparedStatement actualFirst = cacheContext.getOrCreate(connection,
"SELECT 1", false, new PreparedStatementCacheKey("statement-1"), () ->
firstPreparedStatement);
+ PreparedStatement actualSecond = cacheContext.getOrCreate(connection,
"SELECT 1", false, new PreparedStatementCacheKey("statement-2"), () ->
secondPreparedStatement);
+ assertThat(actualFirst, is(firstPreparedStatement));
+ assertThat(actualSecond, is(secondPreparedStatement));
+ }
+
+ @Test
+ void assertEvictWithLRU() throws SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(2);
+ Connection connection = mock(Connection.class);
+ PreparedStatement first = mock(PreparedStatement.class);
+ PreparedStatement second = mock(PreparedStatement.class);
+ PreparedStatement third = mock(PreparedStatement.class);
+ cacheContext.getOrCreate(connection, "SELECT 1", false, new
PreparedStatementCacheKey("statement-1"), () -> first);
+ cacheContext.getOrCreate(connection, "SELECT 2", false, new
PreparedStatementCacheKey("statement-2"), () -> second);
+ cacheContext.getOrCreate(connection, "SELECT 1", false, new
PreparedStatementCacheKey("statement-1"), () -> first);
+ cacheContext.getOrCreate(connection, "SELECT 3", false, new
PreparedStatementCacheKey("statement-3"), () -> third);
+ assertFalse(cacheContext.contains(second));
+ assertTrue(cacheContext.contains(first));
+ assertTrue(cacheContext.contains(third));
+ verify(second).close();
+ }
+
+ @Test
+ void assertInvalidate() throws SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ cacheContext.getOrCreate(connection, "SELECT 1", false, new
PreparedStatementCacheKey("statement-1"), () -> preparedStatement);
+ cacheContext.invalidate(preparedStatement);
+ assertFalse(cacheContext.contains(preparedStatement));
+ verify(preparedStatement).close();
+ }
+
+ @Test
+ void assertInvalidateByPreparedStatementCacheKey() throws SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ Connection connection = mock(Connection.class);
+ PreparedStatement first = mock(PreparedStatement.class);
+ PreparedStatement second = mock(PreparedStatement.class);
+ PreparedStatementCacheKey firstPreparedStatementCacheKey = new
PreparedStatementCacheKey("statement-1");
+ PreparedStatementCacheKey secondPreparedStatementCacheKey = new
PreparedStatementCacheKey("statement-2");
+ cacheContext.getOrCreate(connection, "SELECT 1", false,
firstPreparedStatementCacheKey, () -> first);
+ cacheContext.getOrCreate(connection, "SELECT 1", false,
secondPreparedStatementCacheKey, () -> second);
+ cacheContext.invalidate(firstPreparedStatementCacheKey);
+ assertFalse(cacheContext.contains(first));
+ assertTrue(cacheContext.contains(second));
+ verify(first).close();
+ }
+
+ @Test
+ void assertGetOrCreateWithClosedPreparedStatement() throws SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ Connection connection = mock(Connection.class);
+ PreparedStatement firstPreparedStatement =
mock(PreparedStatement.class);
+ PreparedStatement secondPreparedStatement =
mock(PreparedStatement.class);
+ when(firstPreparedStatement.isClosed()).thenReturn(true);
+ PreparedStatementCacheKey preparedStatementCacheKey = new
PreparedStatementCacheKey("statement-1");
+ cacheContext.getOrCreate(connection, "SELECT 1", false,
preparedStatementCacheKey, () -> firstPreparedStatement);
+ PreparedStatement actualPreparedStatement =
cacheContext.getOrCreate(connection, "SELECT 1", false,
preparedStatementCacheKey, () -> secondPreparedStatement);
+ assertThat(actualPreparedStatement, is(secondPreparedStatement));
+ verify(firstPreparedStatement).close();
+ }
+
+ @Test
+ void assertCloseAll() throws SQLException {
+ PreparedStatementCacheContext cacheContext = new
PreparedStatementCacheContext(8);
+ Connection connection = mock(Connection.class);
+ PreparedStatement first = mock(PreparedStatement.class);
+ PreparedStatement second = mock(PreparedStatement.class);
+ cacheContext.getOrCreate(connection, "SELECT 1", false, new
PreparedStatementCacheKey("statement-1"), () -> first);
+ cacheContext.getOrCreate(connection, "SELECT 2", false, new
PreparedStatementCacheKey("statement-2"), () -> second);
+ cacheContext.closeAll();
+ assertThat(cacheContext.size(), is(0));
+ verify(first).close();
+ verify(second).close();
+ }
+}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKeyTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKeyTest.java
new file mode 100644
index 00000000000..302876f2b8a
--- /dev/null
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKeyTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PreparedStatementCacheKeyTest {
+
+ @Test
+ void assertGetToken() {
+ PreparedStatementCacheKey preparedStatementCacheKey = new
PreparedStatementCacheKey("statement-1");
+ assertThat(preparedStatementCacheKey.getToken(), is("statement-1"));
+ assertThat(preparedStatementCacheKey, is(new
PreparedStatementCacheKey("statement-1")));
+ }
+
+ @Test
+ void assertConstructorWithNullToken() {
+ NullPointerException actual = assertThrows(NullPointerException.class,
() -> new PreparedStatementCacheKey(null));
+ assertThat(actual.getMessage(), is("token cannot be null."));
+ }
+}
diff --git
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleaner.java
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleaner.java
new file mode 100644
index 00000000000..917bdbbf413
--- /dev/null
+++
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleaner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
+
+import java.sql.SQLException;
+
+/**
+ * Firebird statement resource cleaner.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class FirebirdStatementResourceCleaner {
+
+ private static final String PREPARED_STATEMENT_CACHE_KEY_PREFIX =
"firebird:";
+
+ /**
+ * Create prepared statement cache key.
+ *
+ * @param statementId statement ID
+ * @return prepared statement cache key
+ */
+ public static PreparedStatementCacheKey
createPreparedStatementCacheKey(final int statementId) {
+ return new
PreparedStatementCacheKey(PREPARED_STATEMENT_CACHE_KEY_PREFIX + statementId);
+ }
+
+ /**
+ * Clean Firebird statement resources.
+ *
+ * @param connectionSession connection session
+ * @param statementId statement ID
+ * @param invalidatePreparedStatementCache whether invalidate prepared
statement cache
+ * @throws SQLException SQL exception
+ */
+ public static void clean(final ConnectionSession connectionSession, final
int statementId, final boolean invalidatePreparedStatementCache) throws
SQLException {
+ ProxyBackendHandler proxyBackendHandler =
FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(connectionSession.getConnectionId(),
statementId);
+ if (null != proxyBackendHandler) {
+
connectionSession.getDatabaseConnectionManager().removeResource(proxyBackendHandler);
+
FirebirdFetchStatementCache.getInstance().unregisterStatement(connectionSession.getConnectionId(),
statementId);
+ connectionSession.getConnectionContext().clearCursorContext();
+ try {
+ proxyBackendHandler.close();
+ } finally {
+ invalidatePreparedStatementCacheIfNecessary(connectionSession,
statementId, invalidatePreparedStatementCache);
+ }
+ return;
+ }
+
FirebirdFetchStatementCache.getInstance().unregisterStatement(connectionSession.getConnectionId(),
statementId);
+ connectionSession.getConnectionContext().clearCursorContext();
+ invalidatePreparedStatementCacheIfNecessary(connectionSession,
statementId, invalidatePreparedStatementCache);
+ }
+
+ private static void invalidatePreparedStatementCacheIfNecessary(final
ConnectionSession connectionSession, final int statementId,
+ final
boolean invalidatePreparedStatementCache) {
+ if (invalidatePreparedStatementCache) {
+
connectionSession.invalidatePreparedStatementCache(createPreparedStatementCacheKey(statementId));
+ }
+ }
+}
diff --git
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
index 21b7410dbbd..890b941b1c6 100644
---
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
+++
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
@@ -44,6 +44,7 @@ import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.upload.FirebirdBlobUploadCache;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
import java.sql.SQLException;
@@ -70,9 +71,30 @@ public final class FirebirdExecuteStatementCommandExecutor
implements CommandExe
@Override
public Collection<DatabasePacket> execute() throws SQLException {
- FirebirdServerPreparedStatement preparedStatement =
connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
- List<Object> params = packet.getParameterValues();
- final List<Long> blobIdsToRemove = bindBlobParameters(params);
+
connectionSession.beginPreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(packet.getStatementId()));
+ try {
+ FirebirdServerPreparedStatement preparedStatement =
connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
+ ResponseHeader responseHeader =
executePreparedStatement(preparedStatement, packet.getParameterValues());
+ if (responseHeader instanceof QueryResponseHeader) {
+ responseType = ResponseType.QUERY;
+
FirebirdFetchStatementCache.getInstance().registerStatement(connectionSession.getConnectionId(),
packet.getStatementId(), proxyBackendHandler);
+
connectionSession.getDatabaseConnectionManager().markResourceInUse(proxyBackendHandler);
+ } else {
+ responseType = ResponseType.UPDATE;
+ }
+ Collection<DatabasePacket> result = new LinkedList<>();
+ if (packet.isStoredProcedure() && proxyBackendHandler.next()) {
+ result.add(getSQLResponse());
+ }
+ result.add(new FirebirdGenericResponsePacket());
+ return result;
+ } finally {
+ connectionSession.finishPreparedStatementCache();
+ }
+ }
+
+ private ResponseHeader executePreparedStatement(final
FirebirdServerPreparedStatement preparedStatement, final List<Object> params)
throws SQLException {
+ List<Long> blobIdsToRemove = bindBlobParameters(params);
SQLStatementContext sqlStatementContext =
preparedStatement.getSqlStatementContext();
if (sqlStatementContext instanceof ParameterAware) {
((ParameterAware) sqlStatementContext).bindParameters(params);
@@ -80,22 +102,10 @@ public final class FirebirdExecuteStatementCommandExecutor
implements CommandExe
QueryContext queryContext = new QueryContext(sqlStatementContext,
preparedStatement.getSql(), params, preparedStatement.getHintValueContext(),
connectionSession.getConnectionContext(),
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(),
true);
proxyBackendHandler =
ProxyBackendHandlerFactory.newInstance(TypedSPILoader.getService(DatabaseType.class,
"Firebird"), queryContext, connectionSession, true);
- ResponseHeader responseHeader = proxyBackendHandler.execute();
- if (responseHeader instanceof QueryResponseHeader) {
- responseType = ResponseType.QUERY;
-
FirebirdFetchStatementCache.getInstance().registerStatement(connectionSession.getConnectionId(),
packet.getStatementId(), proxyBackendHandler);
-
connectionSession.getDatabaseConnectionManager().markResourceInUse(proxyBackendHandler);
- } else {
- responseType = ResponseType.UPDATE;
- }
- if (responseHeader instanceof UpdateResponseHeader) {
+ ResponseHeader result = proxyBackendHandler.execute();
+ if (result instanceof UpdateResponseHeader) {
clearBlobUploads(blobIdsToRemove);
}
- Collection<DatabasePacket> result = new LinkedList<>();
- if (packet.isStoredProcedure() && proxyBackendHandler.next()) {
- result.add(getSQLResponse());
- }
- result.add(new FirebirdGenericResponsePacket());
return result;
}
diff --git
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
index 8f3e3320c48..3f62a03f073 100644
---
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
+++
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
@@ -61,7 +61,10 @@ public final class FirebirdFetchStatementCache {
* @param proxyBackendHandler proxy backend handler
*/
public void registerStatement(final int connectionId, final int
statementId, final ProxyBackendHandler proxyBackendHandler) {
- statementRegistry.get(connectionId).put(statementId,
proxyBackendHandler);
+ Map<Integer, ProxyBackendHandler> statements =
statementRegistry.get(connectionId);
+ if (null != statements) {
+ statements.put(statementId, proxyBackendHandler);
+ }
}
/**
@@ -72,7 +75,8 @@ public final class FirebirdFetchStatementCache {
* @return fetch response packets
*/
public ProxyBackendHandler getFetchBackendHandler(final int connectionId,
final int statementId) {
- return statementRegistry.get(connectionId).get(statementId);
+ Map<Integer, ProxyBackendHandler> statements =
statementRegistry.get(connectionId);
+ return null == statements ? null : statements.get(statementId);
}
/**
@@ -82,7 +86,10 @@ public final class FirebirdFetchStatementCache {
* @param statementId statement ID
*/
public void unregisterStatement(final int connectionId, final int
statementId) {
- statementRegistry.get(connectionId).remove(statementId);
+ Map<Integer, ProxyBackendHandler> statements =
statementRegistry.get(connectionId);
+ if (null != statements) {
+ statements.remove(statementId);
+ }
}
/**
diff --git
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
index 3d050685603..0c0eeed85ed 100644
---
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
+++
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
@@ -22,11 +22,11 @@ import
org.apache.shardingsphere.database.protocol.firebird.exception.FirebirdPr
import
org.apache.shardingsphere.database.protocol.firebird.packet.command.query.statement.FirebirdFreeStatementPacket;
import
org.apache.shardingsphere.database.protocol.firebird.packet.generic.FirebirdGenericResponsePacket;
import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
@@ -41,17 +41,15 @@ public final class FirebirdFreeStatementCommandExecutor
implements CommandExecut
private final ConnectionSession connectionSession;
@Override
- public Collection<DatabasePacket> execute() {
+ public Collection<DatabasePacket> execute() throws SQLException {
switch (packet.getOption()) {
case FirebirdFreeStatementPacket.DROP:
case FirebirdFreeStatementPacket.UNPREPARE:
connectionSession.getServerPreparedStatementRegistry().removePreparedStatement(packet.getStatementId());
+ FirebirdStatementResourceCleaner.clean(connectionSession,
packet.getStatementId(), true);
break;
case FirebirdFreeStatementPacket.CLOSE:
- connectionSession.getConnectionContext().clearCursorContext();
- ProxyBackendHandler proxyBackendHandler =
FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(connectionSession.getConnectionId(),
packet.getStatementId());
-
connectionSession.getDatabaseConnectionManager().unmarkResourceInUse(proxyBackendHandler);
-
FirebirdFetchStatementCache.getInstance().unregisterStatement(connectionSession.getConnectionId(),
packet.getStatementId());
+ FirebirdStatementResourceCleaner.clean(connectionSession,
packet.getStatementId(), false);
break;
default:
throw new FirebirdProtocolException("Unknown DSQL option type
%d", packet.getOption());
diff --git
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
index 6fb4cf809cb..0d98b257ed2 100644
---
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
+++
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
@@ -57,6 +57,7 @@ import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.metadata.FirebirdBlobColumnMetaData;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.metadata.FirebirdBlobColumnMetaDataResolver;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementIdGenerator;
import
org.apache.shardingsphere.sql.parser.statement.core.enums.TableSourceType;
import
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.ReturningSegment;
@@ -85,6 +86,7 @@ import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.Ro
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.SavepointStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
+import java.sql.SQLException;
import java.sql.Types;
import java.util.Collection;
import java.util.Collections;
@@ -106,15 +108,19 @@ public final class
FirebirdPrepareStatementCommandExecutor implements CommandExe
private ReturningSegment returningSegment;
@Override
- public Collection<DatabasePacket> execute() {
+ public Collection<DatabasePacket> execute() throws SQLException {
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
SQLParserRule sqlParserRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "Firebird");
SQLStatement sqlStatement =
sqlParserRule.getSQLParserEngine(databaseType).parse(packet.getSQL(), true);
SQLStatementContext sqlStatementContext = new SQLBindEngine(
metaDataContexts.getMetaData(),
connectionSession.getCurrentDatabaseName(),
packet.getHintValueContext()).bind(sqlStatement);
+ int statementId = getStatementId();
+ if (packet.isValidStatementHandle()) {
+ FirebirdStatementResourceCleaner.clean(connectionSession,
statementId, true);
+ }
FirebirdServerPreparedStatement serverPreparedStatement = new
FirebirdServerPreparedStatement(packet.getSQL(), sqlStatementContext,
packet.getHintValueContext());
-
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(getStatementId(),
serverPreparedStatement);
+
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(statementId,
serverPreparedStatement);
return createResponse(sqlStatementContext, metaDataContexts);
}
diff --git
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleanerTest.java
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleanerTest.java
new file mode 100644
index 00000000000..c3d26827a41
--- /dev/null
+++
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleanerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
+import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings(ProxyContext.class)
+class FirebirdStatementResourceCleanerTest {
+
+ private static final int CONNECTION_ID = 1;
+
+ private static final int STATEMENT_ID = 2;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private ConnectionSession connectionSession;
+
+ @Mock
+ private ProxyDatabaseConnectionManager connectionManager;
+
+ @Mock
+ private ConnectionContext connectionContext;
+
+ @Mock
+ private ProxyBackendHandler proxyBackendHandler;
+
+ @AfterEach
+ void tearDown() {
+
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID,
STATEMENT_ID);
+
FirebirdFetchStatementCache.getInstance().unregisterConnection(CONNECTION_ID);
+ }
+
+ @Test
+ void assertCreatePreparedStatementCacheKey() {
+
assertThat(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID).getToken(),
is("firebird:2"));
+ }
+
+ @Test
+ void assertCleanWithPreparedStatementCacheInvalidation() throws
SQLException {
+
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
+
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID,
STATEMENT_ID, proxyBackendHandler);
+ when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
+
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+ FirebirdStatementResourceCleaner.clean(connectionSession,
STATEMENT_ID, true);
+ InOrder inOrder = inOrder(connectionManager, connectionContext,
proxyBackendHandler, connectionSession);
+ inOrder.verify(connectionManager).removeResource(proxyBackendHandler);
+ inOrder.verify(connectionContext).clearCursorContext();
+ inOrder.verify(proxyBackendHandler).close();
+
inOrder.verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID));
+ }
+
+ @Test
+ void
assertCleanWithPreparedStatementCacheInvalidationWithoutFetchHandler() throws
SQLException {
+ when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+ FirebirdStatementResourceCleaner.clean(connectionSession,
STATEMENT_ID, true);
+ verify(connectionContext).clearCursorContext();
+
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+ verifyNoInteractions(connectionManager, proxyBackendHandler);
+ }
+
+ @Test
+ void assertCleanWithoutPreparedStatementCacheInvalidation() throws
SQLException {
+
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
+
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID,
STATEMENT_ID, proxyBackendHandler);
+ when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
+
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+ FirebirdStatementResourceCleaner.clean(connectionSession,
STATEMENT_ID, false);
+ verify(connectionManager).removeResource(proxyBackendHandler);
+ verify(proxyBackendHandler).close();
+ verify(connectionContext).clearCursorContext();
+ verify(connectionSession,
never()).invalidatePreparedStatementCache(any());
+
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID));
+ }
+
+ @Test
+ void assertCleanBeforeCloseExecutionResources() throws SQLException,
BackendConnectionException {
+ ConnectionSession actualConnectionSession =
mock(ConnectionSession.class, Answers.RETURNS_DEEP_STUBS);
+ ConnectionContext actualConnectionContext =
mock(ConnectionContext.class, Answers.RETURNS_DEEP_STUBS);
+ ContextManager contextManager = mock(ContextManager.class,
Answers.RETURNS_DEEP_STUBS);
+ ProxyBackendHandler actualProxyBackendHandler =
mock(ProxyBackendHandler.class);
+
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+
when(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules()).thenReturn(Collections.emptyList());
+
when(actualConnectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+
when(actualConnectionSession.getConnectionContext()).thenReturn(actualConnectionContext);
+
when(actualConnectionContext.getTransactionContext().getTransactionType()).thenReturn(Optional.of("XA"));
+
when(actualConnectionSession.getTransactionStatus().isInConnectionHeldTransaction(any())).thenReturn(true);
+ ProxyDatabaseConnectionManager actualConnectionManager = new
ProxyDatabaseConnectionManager(actualConnectionSession);
+
when(actualConnectionSession.getDatabaseConnectionManager()).thenReturn(actualConnectionManager);
+
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
+
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID,
STATEMENT_ID, actualProxyBackendHandler);
+ actualConnectionManager.add(actualProxyBackendHandler);
+ actualConnectionManager.markResourceInUse(actualProxyBackendHandler);
+ final AtomicBoolean closed = new AtomicBoolean(false);
+ org.mockito.Mockito.doAnswer(invocation -> {
+ if (closed.getAndSet(true)) {
+ throw new SQLException("close twice");
+ }
+ return null;
+ }).when(actualProxyBackendHandler).close();
+ FirebirdStatementResourceCleaner.clean(actualConnectionSession,
STATEMENT_ID, true);
+ actualConnectionManager.closeExecutionResources();
+ verify(actualProxyBackendHandler, times(1)).close();
+
verify(actualConnectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID));
+ }
+}
diff --git
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
index d12ce19863e..fc408937cd4 100644
---
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
+++
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
@@ -46,6 +46,7 @@ import
org.apache.shardingsphere.proxy.backend.session.ServerPreparedStatementRe
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.upload.FirebirdBlobUploadCache;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
@@ -77,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(AutoMockExtension.class)
@@ -156,6 +158,8 @@ class FirebirdExecuteStatementCommandExecutorTest {
assertThat(iterator.next(), isA(FirebirdGenericResponsePacket.class));
assertFalse(iterator.hasNext());
assertThat(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID), is(proxyBackendHandler));
+
verify(connectionSession).beginPreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(1));
+ verify(connectionSession).finishPreparedStatementCache();
}
@Test
@@ -170,6 +174,8 @@ class FirebirdExecuteStatementCommandExecutorTest {
assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
assertThat(actual.iterator().next(),
isA(FirebirdGenericResponsePacket.class));
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID));
+
verify(connectionSession).beginPreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(2));
+ verify(connectionSession).finishPreparedStatementCache();
}
@Test
diff --git
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
index 31724987f81..3989ed7b92a 100644
---
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
+++
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
@@ -72,6 +72,11 @@ class FirebirdFetchStatementCacheTest {
assertNull(cache.getFetchBackendHandler(1, 11));
}
+ @Test
+ void assertGetFetchBackendHandlerWithoutConnection() {
+ assertNull(cache.getFetchBackendHandler(1, 10));
+ }
+
@Test
void assertUnregisterStatement() {
cache.registerConnection(1);
@@ -80,6 +85,12 @@ class FirebirdFetchStatementCacheTest {
assertNull(cache.getFetchBackendHandler(1, 10));
}
+ @Test
+ void assertUnregisterStatementWithoutConnection() {
+ cache.unregisterStatement(1, 10);
+ assertFalse(statementRegistry.containsKey(1));
+ }
+
@Test
void assertUnregisterConnection() {
cache.registerConnection(1);
diff --git
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
index c0bc6c15dea..4f3902edd64 100644
---
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
+++
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
@@ -84,13 +84,15 @@ class FirebirdFetchStatementCommandExecutorTest {
@Test
void assertExecuteWhenNoBackendHandler() throws SQLException {
executor = new FirebirdFetchStatementCommandExecutor(packet,
connectionSession);
- Collection<DatabasePacket> actualPackets = executor.execute();
- Iterator<DatabasePacket> packetIterator = actualPackets.iterator();
- FirebirdFetchResponsePacket actualPacket =
(FirebirdFetchResponsePacket) packetIterator.next();
- assertThat(actualPackets.size(), is(1));
- assertThat(actualPacket.getStatus(),
is(ISCConstants.FETCH_NO_MORE_ROWS));
- assertThat(actualPacket.getCount(), is(0));
- assertNull(actualPacket.getRow());
+ assertNoMoreRowsResponse(executor.execute());
+ }
+
+ @Test
+ void assertExecuteWhenNoBackendHandlerAfterSameHandleReprepare() throws
SQLException {
+
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID,
STATEMENT_ID, proxyBackendHandler);
+
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID,
STATEMENT_ID);
+ executor = new FirebirdFetchStatementCommandExecutor(packet,
connectionSession);
+ assertNoMoreRowsResponse(executor.execute());
}
@Test
@@ -139,4 +141,13 @@ class FirebirdFetchStatementCommandExecutorTest {
assertNull(actualNoMorePacket.getRow());
verify(databaseConnectionManager).unmarkResourceInUse(proxyBackendHandler);
}
+
+ private void assertNoMoreRowsResponse(final Collection<DatabasePacket>
actualPackets) {
+ Iterator<DatabasePacket> packetIterator = actualPackets.iterator();
+ FirebirdFetchResponsePacket actualPacket =
(FirebirdFetchResponsePacket) packetIterator.next();
+ assertThat(actualPackets.size(), is(1));
+ assertThat(actualPacket.getStatus(),
is(ISCConstants.FETCH_NO_MORE_ROWS));
+ assertThat(actualPacket.getCount(), is(0));
+ assertNull(actualPacket.getRow());
+ }
}
diff --git
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
index c1baea6e381..0fa8451bff4 100644
---
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
+++
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
@@ -21,15 +21,18 @@ import
org.apache.shardingsphere.database.protocol.firebird.exception.FirebirdPr
import
org.apache.shardingsphere.database.protocol.firebird.packet.command.query.statement.FirebirdFreeStatementPacket;
import
org.apache.shardingsphere.database.protocol.firebird.packet.generic.FirebirdGenericResponsePacket;
import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
-import
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.session.ServerPreparedStatementRegistry;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -37,11 +40,14 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.util.Collection;
+import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -62,9 +68,6 @@ class FirebirdFreeStatementCommandExecutorTest {
@Mock
private ServerPreparedStatementRegistry registry;
- @Mock
- private ProxyDatabaseConnectionManager connectionManager;
-
@Mock
private ProxyBackendHandler proxyBackendHandler;
@@ -75,7 +78,6 @@ class FirebirdFreeStatementCommandExecutorTest {
when(packet.getStatementId()).thenReturn(STATEMENT_ID);
when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(registry);
-
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
}
@AfterEach
@@ -84,36 +86,45 @@ class FirebirdFreeStatementCommandExecutorTest {
FirebirdFetchStatementCache.getInstance().unregisterConnection(CONNECTION_ID);
}
- @Test
- void assertExecuteWithDrop() {
- when(packet.getOption()).thenReturn(FirebirdFreeStatementPacket.DROP);
- FirebirdFreeStatementCommandExecutor executor = new
FirebirdFreeStatementCommandExecutor(packet, connectionSession);
- Collection<DatabasePacket> actual = executor.execute();
- assertThat(actual.iterator().next(),
isA(FirebirdGenericResponsePacket.class));
- verify(registry).removePreparedStatement(1);
- }
-
- @Test
- void assertExecuteWithUnprepare() {
-
when(packet.getOption()).thenReturn(FirebirdFreeStatementPacket.UNPREPARE);
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("preparedStatementFreeOptions")
+ void assertExecuteWithPreparedStatementFreeOption(final String scenario,
final int option) throws Exception {
+ when(packet.getOption()).thenReturn(option);
FirebirdFreeStatementCommandExecutor executor = new
FirebirdFreeStatementCommandExecutor(packet, connectionSession);
Collection<DatabasePacket> actual = executor.execute();
assertThat(actual.iterator().next(),
isA(FirebirdGenericResponsePacket.class));
- verify(registry).removePreparedStatement(1);
+ verify(registry).removePreparedStatement(STATEMENT_ID);
+
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID));
}
@Test
- void assertExecuteWithClose() {
+ void assertExecuteWithClose() throws Exception {
when(packet.getOption()).thenReturn(FirebirdFreeStatementPacket.CLOSE);
new FirebirdFreeStatementCommandExecutor(packet,
connectionSession).execute();
- verify(connectionSession.getConnectionContext()).clearCursorContext();
- verify(connectionManager).unmarkResourceInUse(proxyBackendHandler);
+ verify(connectionSession,
never()).invalidatePreparedStatementCache(any());
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
STATEMENT_ID));
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("preparedStatementFreeOptions")
+ void assertExecuteWithPreparedStatementFreeOptionWithoutFetchHandler(final
String scenario, final int option) throws Exception {
+
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID,
STATEMENT_ID);
+ when(packet.getOption()).thenReturn(option);
+ new FirebirdFreeStatementCommandExecutor(packet,
connectionSession).execute();
+ verify(registry).removePreparedStatement(STATEMENT_ID);
+
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+ }
+
@Test
void assertExecuteWithUnknownOption() {
when(packet.getOption()).thenReturn(999);
assertThrows(FirebirdProtocolException.class, new
FirebirdFreeStatementCommandExecutor(packet, connectionSession)::execute);
}
+
+ private static Stream<Arguments> preparedStatementFreeOptions() {
+ return Stream.of(
+
Arguments.of("execute_dropCleansPreparedStatementAndFetchResources",
FirebirdFreeStatementPacket.DROP),
+
Arguments.of("execute_unprepareCleansPreparedStatementAndFetchResources",
FirebirdFreeStatementPacket.UNPREPARE));
+ }
}
diff --git
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
index d950a6a3be0..99ef0734938 100644
---
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
+++
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
@@ -45,13 +45,18 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.session.ServerPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
+import
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
import org.apache.shardingsphere.sql.parser.engine.api.CacheOption;
import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
@@ -77,6 +82,8 @@ import static org.mockito.Mockito.when;
@MockitoSettings(strictness = Strictness.LENIENT)
class FirebirdPrepareStatementCommandExecutorTest {
+ private static final int CONNECTION_ID = 1;
+
private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "Firebird");
@Mock
@@ -85,12 +92,24 @@ class FirebirdPrepareStatementCommandExecutorTest {
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ConnectionSession connectionSession;
+ @Mock
+ private ProxyDatabaseConnectionManager connectionManager;
+
+ @Mock
+ private ProxyBackendHandler proxyBackendHandler;
+
+ @Mock
+ private ConnectionContext connectionContext;
+
@BeforeEach
void setUp() {
- ConnectionContext connectionContext = new
ConnectionContext(Collections::emptySet, new Grantee("foo_user"));
+
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new
ServerPreparedStatementRegistry());
when(connectionSession.getCurrentDatabaseName()).thenReturn("foo_db");
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+ when(connectionContext.getGrantee()).thenReturn(new
Grantee("foo_user"));
+ when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
when(packet.getSQL()).thenReturn("SELECT 1");
when(packet.getHintValueContext()).thenReturn(new HintValueContext());
when(packet.isValidStatementHandle()).thenReturn(true);
@@ -100,6 +119,12 @@ class FirebirdPrepareStatementCommandExecutorTest {
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts()).thenReturn(createMetaDataContexts());
}
+ @AfterEach
+ void tearDown() {
+
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID, 1);
+
FirebirdFetchStatementCache.getInstance().unregisterConnection(CONNECTION_ID);
+ }
+
private MetaDataContexts createMetaDataContexts() {
SQLParserRule parserRule = new SQLParserRule(new
SQLParserRuleConfiguration(new CacheOption(128, 1024L), new CacheOption(128,
1024L)));
RuleMetaData globalRuleMetaData = new
RuleMetaData(Collections.singleton(parserRule));
@@ -115,7 +140,7 @@ class FirebirdPrepareStatementCommandExecutorTest {
}
@Test
- void assertExecute() {
+ void assertExecute() throws Exception {
FirebirdPrepareStatementCommandExecutor executor = new
FirebirdPrepareStatementCommandExecutor(packet, connectionSession);
Collection<DatabasePacket> actual = executor.execute();
FirebirdGenericResponsePacket responsePacket =
(FirebirdGenericResponsePacket) actual.iterator().next();
@@ -127,7 +152,7 @@ class FirebirdPrepareStatementCommandExecutorTest {
}
@Test
- void assertDescribeCountReturnsBigintType() {
+ void assertDescribeCountReturnsBigintType() throws Exception {
when(packet.getSQL()).thenReturn("SELECT COUNT(*) FROM foo_tbl");
when(packet.nextItem()).thenReturn(true, true, true, true, true,
false);
when(packet.getCurrentItem()).thenReturn(
@@ -147,4 +172,22 @@ class FirebirdPrepareStatementCommandExecutorTest {
columnPacket.write(payload);
verify(payload).writeInt4LE(FirebirdBinaryColumnType.INT64.getValue()
+ 1);
}
+
+ @Test
+ void
assertExecuteWithValidStatementHandleCleansPreviousPreparedStatementResources()
throws Exception {
+
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(1,
new FirebirdServerPreparedStatement("SELECT 0",
mock(SelectStatementContext.class), new HintValueContext()));
+
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID, 1,
proxyBackendHandler);
+ FirebirdPrepareStatementCommandExecutor executor = new
FirebirdPrepareStatementCommandExecutor(packet, connectionSession);
+ executor.execute();
+
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(1));
+
assertThat(connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(1).getSql(),
is("SELECT 1"));
+
assertThat(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
1), is((ProxyBackendHandler) null));
+ }
+
+ @Test
+ void assertExecuteWithValidStatementHandleWithoutFetchHandler() throws
Exception {
+ FirebirdPrepareStatementCommandExecutor executor = new
FirebirdPrepareStatementCommandExecutor(packet, connectionSession);
+ executor.execute();
+
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(1));
+ }
}