This is an automated email from the ASF dual-hosted git repository.

terrymanu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dd873cc948a Add prepared statement caching mechanism in proxy (#38644)
dd873cc948a is described below

commit dd873cc948a72aec2522418e5eeca25479964c6e
Author: KazenkE <[email protected]>
AuthorDate: Sat Jun 6 00:36:04 2026 +0800

    Add prepared statement caching mechanism in proxy (#38644)
    
    * Add prepared statement caching mechanism
    
    * change prepared statement cache to Firebird-only connection-held scope
    
    * verify Firebird connection-held prepared statement cache reuse.
    
    * fix the problem with check spotless.
    
    * Fix same-handle Firebird prepare cleanup and prove reuse on real held 
backend path.
    
    * fix the problem with confliction.
    
    * Refine Firebird prepared statement cache cleanup.
    
    * fix Firebird cleanup order and add held-handler regressions.
---
 RELEASE-NOTES.md                                   |   1 +
 .../connector/ProxyDatabaseConnectionManager.java  |  11 +
 .../connector/StandardDatabaseProxyConnector.java  |   8 +
 .../jdbc/statement/JDBCBackendStatement.java       |  28 +-
 .../proxy/backend/session/ConnectionSession.java   |  40 ++-
 .../session/PreparedStatementCacheContext.java     | 220 ++++++++++++++
 .../backend/session/PreparedStatementCacheKey.java |  37 +++
 .../ProxyDatabaseConnectionManagerTest.java        |  62 +++-
 .../StandardDatabaseProxyConnectorTest.java        | 316 ++++++++++++++++++++-
 .../jdbc/statement/JDBCBackendStatementTest.java   | 108 ++++++-
 .../backend/session/ConnectionSessionTest.java     |  24 ++
 .../session/PreparedStatementCacheContextTest.java | 140 +++++++++
 .../session/PreparedStatementCacheKeyTest.java     |  40 +++
 .../FirebirdStatementResourceCleaner.java          |  79 ++++++
 .../FirebirdExecuteStatementCommandExecutor.java   |  44 +--
 .../fetch/FirebirdFetchStatementCache.java         |  13 +-
 .../free/FirebirdFreeStatementCommandExecutor.java |  12 +-
 .../FirebirdPrepareStatementCommandExecutor.java   |  10 +-
 .../FirebirdStatementResourceCleanerTest.java      | 157 ++++++++++
 ...irebirdExecuteStatementCommandExecutorTest.java |   6 +
 .../fetch/FirebirdFetchStatementCacheTest.java     |  11 +
 .../FirebirdFetchStatementCommandExecutorTest.java |  25 +-
 .../FirebirdFreeStatementCommandExecutorTest.java  |  53 ++--
 ...irebirdPrepareStatementCommandExecutorTest.java |  49 +++-
 24 files changed, 1422 insertions(+), 72 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 38ae7ae3675..e910c843972 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -48,6 +48,7 @@
 1. Sharding: Fix HASH_MOD routing mismatch for same negative numeric values 
across numeric Java types with compatibility switch 
`normalize-numeric-int-range` - 
[#38327](https://github.com/apache/shardingsphere/pull/38327)
 1. Proxy: Support non column projection for MySQL prepared statement in Proxy 
- [#38507](https://github.com/apache/shardingsphere/pull/38507)
 1. Proxy: Support driverClassName config in proxy storage unit to solve mysql 
and mariadb jdbc url conflict - 
[#38582](https://github.com/apache/shardingsphere/pull/38582)
+1. Proxy: Support Firebird prepared statement cache reuse for held connections 
- [#38644](https://github.com/apache/shardingsphere/pull/38644)
 
 ## Release 5.5.3
 
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
index 68c7901dbc7..b65f2a70589 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
@@ -248,6 +248,16 @@ public final class ProxyDatabaseConnectionManager 
implements DatabaseConnectionM
         inUseProxyBackendHandlers.remove(handler);
     }
     
+    /**
+     * Remove proxy backend handler resource.
+     *
+     * @param handler proxy backend handler to be removed
+     */
+    public void removeResource(final ProxyBackendHandler handler) {
+        proxyBackendHandlers.remove(handler);
+        inUseProxyBackendHandlers.remove(handler);
+    }
+    
     /**
      * Handle auto commit.
      */
@@ -349,6 +359,7 @@ public final class ProxyDatabaseConnectionManager 
implements DatabaseConnectionM
             }
             cachedConnections.clear();
         }
+        connectionSession.getPreparedStatementCacheContext().closeAll();
         if (!forceRollback) {
             connectionPostProcessors.clear();
         }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
index 3ad376a8aae..3076ab1fded 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnector.java
@@ -85,6 +85,7 @@ import 
org.apache.shardingsphere.transaction.api.TransactionType;
 import 
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -393,6 +394,9 @@ public final class StandardDatabaseProxyConnector 
implements DatabaseProxyConnec
     private Collection<SQLException> closeStatements() {
         Collection<SQLException> result = new LinkedList<>();
         for (Statement each : cachedStatements) {
+            if (isCachedPreparedStatement(each)) {
+                continue;
+            }
             try {
                 each.cancel();
                 each.close();
@@ -404,6 +408,10 @@ public final class StandardDatabaseProxyConnector 
implements DatabaseProxyConnec
         return result;
     }
     
+    private boolean isCachedPreparedStatement(final Statement statement) {
+        return statement instanceof PreparedStatement && 
databaseConnectionManager.getConnectionSession().getPreparedStatementCacheContext().contains(statement);
+    }
+    
     private Optional<SQLException> closeSQLFederationEngine() {
         if (null != proxySQLExecutor.getSqlFederationEngine()) {
             try {
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
index 40a052e5148..36fc7af8580 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
@@ -24,6 +24,9 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheContext;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -33,12 +36,19 @@ import java.sql.Types;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Objects;
 
 /**
  * JDBC backend statement.
  */
 public final class JDBCBackendStatement implements 
ExecutorJDBCStatementManager {
     
+    private final ConnectionSession connectionSession;
+    
+    public JDBCBackendStatement(final ConnectionSession connectionSession) {
+        this.connectionSession = Objects.requireNonNull(connectionSession, 
"connectionSession cannot be null.");
+    }
+    
     @Override
     public Statement createStorageResource(final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option, final DatabaseType 
databaseType) throws SQLException {
         Statement result = connection.createStatement();
@@ -53,9 +63,8 @@ public final class JDBCBackendStatement implements 
ExecutorJDBCStatementManager
                                            final ConnectionMode 
connectionMode, final StatementOption option, final DatabaseType databaseType) 
throws SQLException {
         String sql = executionUnit.getSqlUnit().getSql();
         List<Object> params = executionUnit.getSqlUnit().getParameters();
-        PreparedStatement result = option.isReturnGeneratedKeys()
-                ? 
connection.prepareStatement(executionUnit.getSqlUnit().getSql(), 
Statement.RETURN_GENERATED_KEYS)
-                : connection.prepareStatement(sql);
+        PreparedStatement result = getPreparedStatement(connection, sql, 
option.isReturnGeneratedKeys());
+        result.clearParameters();
         Iterator<Object> paramIterator = params.iterator();
         int index = 0;
         while (paramIterator.hasNext()) {
@@ -73,6 +82,19 @@ public final class JDBCBackendStatement implements 
ExecutorJDBCStatementManager
         return result;
     }
     
+    private PreparedStatement getPreparedStatement(final Connection 
connection, final String sql, final boolean returnGeneratedKeys) throws 
SQLException {
+        Optional<PreparedStatementCacheKey> preparedStatementCacheKey = 
connectionSession.getCurrentPreparedStatementCacheKey();
+        if (!preparedStatementCacheKey.isPresent()) {
+            return createPreparedStatement(connection, sql, 
returnGeneratedKeys);
+        }
+        PreparedStatementCacheContext cacheContext = 
connectionSession.getPreparedStatementCacheContext();
+        return cacheContext.getOrCreate(connection, sql, returnGeneratedKeys, 
preparedStatementCacheKey.get(), () -> createPreparedStatement(connection, sql, 
returnGeneratedKeys));
+    }
+    
+    private PreparedStatement createPreparedStatement(final Connection 
connection, final String sql, final boolean returnGeneratedKeys) throws 
SQLException {
+        return returnGeneratedKeys ? connection.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql);
+    }
+    
     private void setFetchSize(final Statement statement, final DatabaseType 
databaseType) throws SQLException {
         Optional<StatementMemoryStrictlyFetchSizeSetter> fetchSizeSetter = 
DatabaseTypedSPILoader.findService(StatementMemoryStrictlyFetchSizeSetter.class,
 databaseType);
         if (fetchSizeSetter.isPresent()) {
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index d3a6e28989f..cc273f83486 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -65,10 +65,14 @@ public final class ConnectionSession {
     @SuppressWarnings("rawtypes")
     private final ExecutorStatementManager statementManager;
     
+    private final PreparedStatementCacheContext preparedStatementCacheContext 
= new PreparedStatementCacheContext();
+    
     private final ServerPreparedStatementRegistry 
serverPreparedStatementRegistry = new ServerPreparedStatementRegistry();
     
     private final AtomicReference<ConnectionContext> connectionContext = new 
AtomicReference<>();
     
+    private final AtomicReference<Optional<PreparedStatementCacheKey>> 
currentPreparedStatementCacheKey = new AtomicReference<>(Optional.empty());
+    
     private final RequiredSessionVariableRecorder 
requiredSessionVariableRecorder = new RequiredSessionVariableRecorder();
     
     private volatile String processId;
@@ -80,7 +84,7 @@ public final class ConnectionSession {
         transactionStatus = new TransactionStatus();
         this.attributeMap = attributeMap;
         databaseConnectionManager = new ProxyDatabaseConnectionManager(this);
-        statementManager = new JDBCBackendStatement();
+        statementManager = new JDBCBackendStatement(this);
     }
     
     /**
@@ -131,6 +135,40 @@ public final class ConnectionSession {
         return Optional.ofNullable(isolationLevel);
     }
     
+    /**
+     * Begin prepared statement cache.
+     *
+     * @param cacheKey prepared statement cache key
+     */
+    public void beginPreparedStatementCache(final PreparedStatementCacheKey 
cacheKey) {
+        currentPreparedStatementCacheKey.set(Optional.of(cacheKey));
+    }
+    
+    /**
+     * Get current prepared statement cache key.
+     *
+     * @return current prepared statement cache key
+     */
+    public Optional<PreparedStatementCacheKey> 
getCurrentPreparedStatementCacheKey() {
+        return currentPreparedStatementCacheKey.get();
+    }
+    
+    /**
+     * Finish prepared statement cache.
+     */
+    public void finishPreparedStatementCache() {
+        currentPreparedStatementCacheKey.set(Optional.empty());
+    }
+    
+    /**
+     * Invalidate prepared statement cache.
+     *
+     * @param cacheKey prepared statement cache key
+     */
+    public void invalidatePreparedStatementCache(final 
PreparedStatementCacheKey cacheKey) {
+        preparedStatementCacheContext.invalidate(cacheKey);
+    }
+    
     /**
      * Clear query context.
      */
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContext.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContext.java
new file mode 100644
index 00000000000..8704474a7c6
--- /dev/null
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContext.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import lombok.Getter;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Prepared statement cache context.
+ */
+public final class PreparedStatementCacheContext {
+    
+    private static final int DEFAULT_MAX_SIZE = 128;
+    
+    @Getter
+    private final int maxSize;
+    
+    private final Map<CacheKey, PreparedStatement> cachedPreparedStatements = 
new LinkedHashMap<>(16, 0.75F, true);
+    
+    public PreparedStatementCacheContext() {
+        this(DEFAULT_MAX_SIZE);
+    }
+    
+    public PreparedStatementCacheContext(final int maxSize) {
+        this.maxSize = Math.max(1, maxSize);
+    }
+    
+    /**
+     * Get or create prepared statement.
+     *
+     * @param connection connection
+     * @param sql SQL
+     * @param returnGeneratedKeys return generated keys flag
+     * @param preparedStatementCacheKey prepared statement cache key
+     * @param supplier prepared statement supplier
+     * @return prepared statement
+     * @throws SQLException SQL exception
+     */
+    public synchronized PreparedStatement getOrCreate(final Connection 
connection, final String sql, final boolean returnGeneratedKeys,
+                                                      final 
PreparedStatementCacheKey preparedStatementCacheKey, final 
PreparedStatementSupplier supplier) throws SQLException {
+        CacheKey cacheKey = new CacheKey(connection, sql, returnGeneratedKeys, 
preparedStatementCacheKey);
+        PreparedStatement cachedPreparedStatement = 
cachedPreparedStatements.get(cacheKey);
+        if (null != cachedPreparedStatement && 
!isClosed(cachedPreparedStatement)) {
+            return cachedPreparedStatement;
+        }
+        if (null != cachedPreparedStatement) {
+            cachedPreparedStatements.remove(cacheKey);
+            closeQuietly(cachedPreparedStatement);
+        }
+        PreparedStatement result = supplier.get();
+        cachedPreparedStatements.put(cacheKey, result);
+        evictIfNecessary();
+        return result;
+    }
+    
+    /**
+     * Check whether statement is cached.
+     *
+     * @param statement statement
+     * @return cached or not
+     */
+    public synchronized boolean contains(final Statement statement) {
+        return cachedPreparedStatements.containsValue(statement);
+    }
+    
+    /**
+     * Invalidate cached statement.
+     *
+     * @param statement statement
+     */
+    public synchronized void invalidate(final Statement statement) {
+        Iterator<Entry<CacheKey, PreparedStatement>> iterator = 
cachedPreparedStatements.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Entry<CacheKey, PreparedStatement> entry = iterator.next();
+            if (entry.getValue() == statement) {
+                iterator.remove();
+                closeQuietly(entry.getValue());
+                break;
+            }
+        }
+    }
+    
+    /**
+     * Invalidate cached statements by prepared statement cache key.
+     *
+     * @param preparedStatementCacheKey prepared statement cache key
+     */
+    public synchronized void invalidate(final PreparedStatementCacheKey 
preparedStatementCacheKey) {
+        Iterator<Entry<CacheKey, PreparedStatement>> iterator = 
cachedPreparedStatements.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Entry<CacheKey, PreparedStatement> entry = iterator.next();
+            if 
(preparedStatementCacheKey.equals(entry.getKey().preparedStatementCacheKey)) {
+                iterator.remove();
+                closeQuietly(entry.getValue());
+            }
+        }
+    }
+    
+    /**
+     * Close all cached statements.
+     */
+    public synchronized void closeAll() {
+        for (PreparedStatement each : cachedPreparedStatements.values()) {
+            closeQuietly(each);
+        }
+        cachedPreparedStatements.clear();
+    }
+    
+    /**
+     * Get cache size.
+     *
+     * @return cache size
+     */
+    public synchronized int size() {
+        return cachedPreparedStatements.size();
+    }
+    
+    private void evictIfNecessary() {
+        while (cachedPreparedStatements.size() > maxSize) {
+            Iterator<Entry<CacheKey, PreparedStatement>> iterator = 
cachedPreparedStatements.entrySet().iterator();
+            if (!iterator.hasNext()) {
+                break;
+            }
+            Entry<CacheKey, PreparedStatement> eldest = iterator.next();
+            iterator.remove();
+            closeQuietly(eldest.getValue());
+        }
+    }
+    
+    private boolean isClosed(final PreparedStatement preparedStatement) {
+        try {
+            return preparedStatement.isClosed();
+        } catch (final SQLException ignored) {
+            return true;
+        }
+    }
+    
+    private void closeQuietly(final PreparedStatement preparedStatement) {
+        try {
+            preparedStatement.close();
+        } catch (final SQLException ignored) {
+        }
+    }
+    
+    @FunctionalInterface
+    public interface PreparedStatementSupplier {
+        
+        /**
+         * Get prepared statement.
+         *
+         * @return prepared statement
+         * @throws SQLException SQL exception
+         */
+        PreparedStatement get() throws SQLException;
+    }
+    
+    private static final class CacheKey {
+        
+        private final Connection connection;
+        
+        private final String sql;
+        
+        private final boolean returnGeneratedKeys;
+        
+        private final PreparedStatementCacheKey preparedStatementCacheKey;
+        
+        private final int connectionIdentityHash;
+        
+        private CacheKey(final Connection connection, final String sql, final 
boolean returnGeneratedKeys, final PreparedStatementCacheKey 
preparedStatementCacheKey) {
+            this.connection = connection;
+            this.sql = sql;
+            this.returnGeneratedKeys = returnGeneratedKeys;
+            this.preparedStatementCacheKey = 
Objects.requireNonNull(preparedStatementCacheKey, "preparedStatementCacheKey 
cannot be null.");
+            connectionIdentityHash = System.identityHashCode(connection);
+        }
+        
+        @Override
+        public boolean equals(final Object obj) {
+            if (!(obj instanceof CacheKey)) {
+                return false;
+            }
+            CacheKey other = (CacheKey) obj;
+            return connection == other.connection && returnGeneratedKeys == 
other.returnGeneratedKeys && sql.equals(other.sql)
+                    && 
preparedStatementCacheKey.equals(other.preparedStatementCacheKey);
+        }
+        
+        @Override
+        public int hashCode() {
+            int result = connectionIdentityHash;
+            result = 31 * result + sql.hashCode();
+            result = 31 * result + Boolean.hashCode(returnGeneratedKeys);
+            result = 31 * result + preparedStatementCacheKey.hashCode();
+            return result;
+        }
+    }
+}
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKey.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKey.java
new file mode 100644
index 00000000000..5f9b4ed0805
--- /dev/null
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKey.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.util.Objects;
+
+/**
+ * Prepared statement cache key.
+ */
+@Getter
+@EqualsAndHashCode
+public final class PreparedStatementCacheKey {
+    
+    private final String token;
+    
+    public PreparedStatementCacheKey(final String token) {
+        this.token = Objects.requireNonNull(token, "token cannot be null.");
+    }
+}
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
index 30726adfb8c..d774d3c825e 100644
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java
@@ -22,7 +22,10 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -43,6 +46,8 @@ import 
org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheContext;
 import 
org.apache.shardingsphere.proxy.backend.session.RequiredSessionVariableRecorder;
 import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import 
org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel;
@@ -67,6 +72,7 @@ import org.mockito.quality.Strictness;
 
 import java.lang.reflect.Field;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -95,6 +101,7 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
@@ -125,9 +132,10 @@ class ProxyDatabaseConnectionManagerTest {
         
when(connectionSession.getConnectionContext().getTransactionContext()).thenReturn(new
 TransactionConnectionContext());
         when(connectionSession.getTransactionStatus()).thenReturn(new 
TransactionStatus());
         
when(connectionSession.getUsedDatabaseName()).thenReturn(String.format(SCHEMA_PATTERN,
 0));
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.empty());
         databaseConnectionManager = new 
ProxyDatabaseConnectionManager(connectionSession);
         
when(connectionSession.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager);
-        JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
         
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
         
when(connectionSession.getRequiredSessionVariableRecorder()).thenReturn(new 
RequiredSessionVariableRecorder());
     }
@@ -261,6 +269,18 @@ class ProxyDatabaseConnectionManagerTest {
         verifyConnectionPostProcessorsEmpty();
     }
     
+    @Test
+    void assertCloseConnectionsClosesPreparedStatementCache() throws 
SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        Connection connection = prepareCachedConnections();
+        cacheContext.getOrCreate(connection, "SELECT 1", false, new 
PreparedStatementCacheKey("statement-1"), () -> preparedStatement);
+        
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+        databaseConnectionManager.closeConnections(false);
+        verify(preparedStatement).close();
+        assertThat(cacheContext.size(), is(0));
+    }
+    
     @SuppressWarnings("unchecked")
     @SneakyThrows(ReflectiveOperationException.class)
     private void verifyConnectionPostProcessorsEmpty() {
@@ -444,6 +464,18 @@ class ProxyDatabaseConnectionManagerTest {
         assertTrue(actual.isEmpty());
     }
     
+    @Test
+    void assertRemoveDatabaseProxyConnectorResource() {
+        ProxyBackendHandler engine = mock(DatabaseProxyConnector.class);
+        Collection<ProxyBackendHandler> backendHandlers = 
getProxyBackendHandlers();
+        Collection<ProxyBackendHandler> inUseProxyBackendHandlers = 
getInUseProxyBackendHandlers();
+        backendHandlers.add(engine);
+        inUseProxyBackendHandlers.add(engine);
+        databaseConnectionManager.removeResource(engine);
+        assertFalse(backendHandlers.contains(engine));
+        assertFalse(inUseProxyBackendHandlers.contains(engine));
+    }
+    
     @Test
     void assertCloseHandlers() throws SQLException {
         ProxyBackendHandler engine = mock(DatabaseProxyConnector.class);
@@ -496,6 +528,34 @@ class ProxyDatabaseConnectionManagerTest {
         assertThat(getInUseProxyBackendHandlers(), 
is(Collections.singleton(inUseHandler)));
     }
     
+    @Test
+    void 
assertCloseExecutionResourcesKeepsFirebirdPreparedStatementReuseInHeldTransaction()
 throws SQLException, BackendConnectionException {
+        connectionSession.getTransactionStatus().setInTransaction(true);
+        
connectionSession.getConnectionContext().getTransactionContext().beginTransaction(TransactionType.XA.name(),
 null);
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.of(new
 PreparedStatementCacheKey("statement-1")));
+        DatabaseType firebirdDatabaseType = mock(DatabaseType.class);
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
+        Connection cachedConnection = prepareCachedConnections();
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        when(cachedConnection.prepareStatement("SELECT 
1")).thenReturn(preparedStatement);
+        StatementOption statementOption = new StatementOption(false);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit("SELECT 1", 
Collections.singletonList(1))),
+                cachedConnection, 0, ConnectionMode.CONNECTION_STRICTLY, 
statementOption, firebirdDatabaseType);
+        databaseConnectionManager.closeExecutionResources();
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit("SELECT 1", 
Collections.singletonList(2))),
+                cachedConnection, 0, ConnectionMode.CONNECTION_STRICTLY, 
statementOption, firebirdDatabaseType);
+        verify(cachedConnection, times(1)).prepareStatement("SELECT 1");
+        verify(preparedStatement, times(2)).clearParameters();
+        verify(preparedStatement).setObject(1, 1);
+        verify(preparedStatement).setObject(1, 2);
+        verify(cachedConnection, never()).close();
+        assertThat(cacheContext.size(), is(1));
+    }
+    
     @Test
     void assertCloseExecutionResourcesInTransactionWhenClosed() throws 
BackendConnectionException, SQLException {
         connectionSession.getTransactionStatus().setInTransaction(true);
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
index d7e2c9e0ad9..446ff4fd765 100644
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseProxyConnectorTest.java
@@ -32,12 +32,15 @@ import 
org.apache.shardingsphere.infra.binder.context.statement.type.ddl.CursorH
 import 
org.apache.shardingsphere.infra.binder.context.statement.type.ddl.CursorStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import 
org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.EmptyStorageUnitException;
 import 
org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
@@ -49,6 +52,7 @@ import 
org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryMergedResu
 import 
org.apache.shardingsphere.infra.merge.result.impl.memory.MemoryQueryResultRow;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
@@ -72,15 +76,20 @@ import 
org.apache.shardingsphere.mode.metadata.refresher.pushdown.PushDownMetaDa
 import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.fixture.QueryHeaderBuilderFixture;
+import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.datasource.JDBCBackendDataSource;
+import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
 import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
 import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.ProxyBackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilder;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sql.parser.engine.api.CacheOption;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.cursor.CursorNameSegment;
@@ -95,6 +104,7 @@ import 
org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
 import org.junit.jupiter.api.AfterEach;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -107,6 +117,8 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -118,6 +130,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -128,14 +141,19 @@ import static 
org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -146,6 +164,8 @@ class StandardDatabaseProxyConnectorTest {
     
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
     
+    private final DatabaseType firebirdDatabaseType = 
TypedSPILoader.getService(DatabaseType.class, "Firebird");
+    
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ProxyDatabaseConnectionManager databaseConnectionManager;
     
@@ -497,6 +517,184 @@ class StandardDatabaseProxyConnectorTest {
         }
     }
     
+    @Test
+    void assertExecuteWithFirebirdPreparedStatementReuseInHeldConnection() 
throws SQLException, BackendConnectionException {
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        FirebirdPreparedStatementExecutionTestContext testContext = 
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?", 
connection);
+        when(connection.prepareStatement("UPDATE tbl SET col = 
?")).thenReturn(preparedStatement);
+        ExecutionContext firstExecutionContext = 
createFirebirdExecutionContext(testContext, 1);
+        ExecutionContext secondExecutionContext = 
createFirebirdExecutionContext(testContext, 2);
+        DialectDatabaseMetaData dialectDatabaseMetaData = 
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+        try (
+                MockedConstruction<DatabaseTypeRegistry> 
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+                        (mock, context) -> {
+                            
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+                            
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+                        });
+                MockedConstruction<KernelProcessor> mockedKernelProcessor = 
mockConstruction(KernelProcessor.class,
+                        (mock, context) -> 
when(mock.generateExecutionContext(any(QueryContext.class), 
any(RuleMetaData.class), any(ConfigurationProperties.class)))
+                                .thenReturn(1 == context.getCount() ? 
firstExecutionContext : secondExecutionContext));
+                MockedStatic<ShardingSphereServiceLoader> serviceLoader = 
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+            DatabaseProxyConnector engine = 
createFirebirdPreparedStatementConnector(testContext);
+            serviceLoader.when(() -> 
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(1));
+            testContext.connectionSession.finishPreparedStatementCache();
+            
testContext.connectionSession.getDatabaseConnectionManager().closeExecutionResources();
+            
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(1));
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            testContext.connectionSession.finishPreparedStatementCache();
+            assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 3);
+            assertThat(mockedKernelProcessor.constructed().size(), is(2));
+        }
+        verify(connection).prepareStatement("UPDATE tbl SET col = ?");
+        verify(preparedStatement, times(2)).clearParameters();
+        verify(preparedStatement).setObject(1, 1);
+        verify(preparedStatement).setObject(1, 2);
+        verify(connection, never()).close();
+        
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(1));
+    }
+    
+    @Test
+    void 
assertExecuteWithNewPreparedStatementAfterFirebirdPreparedStatementInvalidation()
 throws SQLException, BackendConnectionException {
+        Connection connection = mock(Connection.class);
+        PreparedStatement firstPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement secondPreparedStatement = 
mock(PreparedStatement.class);
+        FirebirdPreparedStatementExecutionTestContext testContext = 
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?", 
connection);
+        when(connection.prepareStatement("UPDATE tbl SET col = 
?")).thenReturn(firstPreparedStatement, secondPreparedStatement);
+        ExecutionContext firstExecutionContext = 
createFirebirdExecutionContext(testContext, 1);
+        ExecutionContext secondExecutionContext = 
createFirebirdExecutionContext(testContext, 2);
+        DialectDatabaseMetaData dialectDatabaseMetaData = 
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+        try (
+                MockedConstruction<DatabaseTypeRegistry> 
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+                        (mock, context) -> {
+                            
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+                            
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+                        });
+                MockedConstruction<KernelProcessor> mockedKernelProcessor = 
mockConstruction(KernelProcessor.class,
+                        (mock, context) -> 
when(mock.generateExecutionContext(any(QueryContext.class), 
any(RuleMetaData.class), any(ConfigurationProperties.class)))
+                                .thenReturn(1 == context.getCount() ? 
firstExecutionContext : secondExecutionContext));
+                MockedStatic<ShardingSphereServiceLoader> serviceLoader = 
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+            DatabaseProxyConnector engine = 
createFirebirdPreparedStatementConnector(testContext);
+            serviceLoader.when(() -> 
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            testContext.connectionSession.finishPreparedStatementCache();
+            
testContext.connectionSession.invalidatePreparedStatementCache(createPreparedStatementCacheKey(1));
+            
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(0));
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            testContext.connectionSession.finishPreparedStatementCache();
+            assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 3);
+            assertThat(mockedKernelProcessor.constructed().size(), is(2));
+        }
+        verify(connection, times(2)).prepareStatement("UPDATE tbl SET col = 
?");
+        verify(firstPreparedStatement).clearParameters();
+        verify(secondPreparedStatement).clearParameters();
+        verify(firstPreparedStatement).setObject(1, 1);
+        verify(secondPreparedStatement).setObject(1, 2);
+        verify(firstPreparedStatement).close();
+        
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(1));
+    }
+    
+    @Test
+    void assertExecuteWithNewPreparedStatementAfterClosingConnections() throws 
SQLException, BackendConnectionException {
+        Connection firstConnection = mock(Connection.class);
+        Connection secondConnection = mock(Connection.class);
+        PreparedStatement firstPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement secondPreparedStatement = 
mock(PreparedStatement.class);
+        FirebirdPreparedStatementExecutionTestContext testContext = 
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?", 
firstConnection, secondConnection);
+        when(firstConnection.prepareStatement("UPDATE tbl SET col = 
?")).thenReturn(firstPreparedStatement);
+        when(secondConnection.prepareStatement("UPDATE tbl SET col = 
?")).thenReturn(secondPreparedStatement);
+        ExecutionContext firstExecutionContext = 
createFirebirdExecutionContext(testContext, 1);
+        ExecutionContext secondExecutionContext = 
createFirebirdExecutionContext(testContext, 2);
+        DialectDatabaseMetaData dialectDatabaseMetaData = 
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+        try (
+                MockedConstruction<DatabaseTypeRegistry> 
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+                        (mock, context) -> {
+                            
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+                            
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+                        });
+                MockedConstruction<KernelProcessor> mockedKernelProcessor = 
mockConstruction(KernelProcessor.class,
+                        (mock, context) -> 
when(mock.generateExecutionContext(any(QueryContext.class), 
any(RuleMetaData.class), any(ConfigurationProperties.class)))
+                                .thenReturn(1 == context.getCount() ? 
firstExecutionContext : secondExecutionContext));
+                MockedStatic<ShardingSphereServiceLoader> serviceLoader = 
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+            DatabaseProxyConnector engine = 
createFirebirdPreparedStatementConnector(testContext);
+            serviceLoader.when(() -> 
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            testContext.connectionSession.finishPreparedStatementCache();
+            
assertThat(testContext.connectionSession.getDatabaseConnectionManager().closeConnections(false),
 is(Collections.emptyList()));
+            
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(0));
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            testContext.connectionSession.finishPreparedStatementCache();
+            assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 3);
+            assertThat(mockedKernelProcessor.constructed().size(), is(2));
+        }
+        verify(firstConnection).prepareStatement("UPDATE tbl SET col = ?");
+        verify(secondConnection).prepareStatement("UPDATE tbl SET col = ?");
+        verify(firstPreparedStatement).clearParameters();
+        verify(secondPreparedStatement).clearParameters();
+        verify(firstPreparedStatement).setObject(1, 1);
+        verify(secondPreparedStatement).setObject(1, 2);
+        verify(firstPreparedStatement).close();
+        verify(firstConnection).close();
+        
assertThat(testContext.connectionSession.getPreparedStatementCacheContext().size(),
 is(1));
+    }
+    
+    @Test
+    void 
assertCloseExecutionResourcesAfterHandlerCleanupAndCacheInvalidation() throws 
SQLException, BackendConnectionException {
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        AtomicBoolean closed = new AtomicBoolean(false);
+        doAnswer(invocation -> {
+            if (closed.get()) {
+                throw new SQLException("cancel after close");
+            }
+            return null;
+        }).when(preparedStatement).cancel();
+        doAnswer(invocation -> {
+            closed.set(true);
+            return null;
+        }).when(preparedStatement).close();
+        when(preparedStatement.isClosed()).thenAnswer(invocation -> 
closed.get());
+        FirebirdPreparedStatementExecutionTestContext testContext = 
createFirebirdPreparedStatementExecutionTestContext("UPDATE tbl SET col = ?", 
connection);
+        when(connection.prepareStatement("UPDATE tbl SET col = 
?")).thenReturn(preparedStatement);
+        ExecutionContext executionContext = 
createFirebirdExecutionContext(testContext, 1);
+        DialectDatabaseMetaData dialectDatabaseMetaData = 
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+        try (
+                MockedConstruction<DatabaseTypeRegistry> 
mockedDatabaseTypeRegistry = mockConstruction(DatabaseTypeRegistry.class,
+                        (mock, context) -> {
+                            
when(mock.getDefaultSchemaName("foo_db")).thenReturn("foo_db");
+                            
when(mock.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData);
+                        });
+                MockedConstruction<KernelProcessor> mockedKernelProcessor = 
mockConstruction(KernelProcessor.class,
+                        (mock, context) -> 
when(mock.generateExecutionContext(any(QueryContext.class), 
any(RuleMetaData.class), 
any(ConfigurationProperties.class))).thenReturn(executionContext));
+                MockedStatic<ShardingSphereServiceLoader> serviceLoader = 
mockStatic(ShardingSphereServiceLoader.class, CALLS_REAL_METHODS)) {
+            DatabaseProxyConnector engine = 
createFirebirdPreparedStatementConnector(testContext);
+            
testContext.connectionSession.getDatabaseConnectionManager().add(engine);
+            
testContext.connectionSession.getDatabaseConnectionManager().markResourceInUse(engine);
+            serviceLoader.when(() -> 
ShardingSphereServiceLoader.getServiceInstances(AdvancedProxySQLExecutor.class)).thenReturn(Collections.emptyList());
+            
testContext.connectionSession.beginPreparedStatementCache(createPreparedStatementCacheKey(1));
+            assertThat(engine.execute(), isA(UpdateResponseHeader.class));
+            testContext.connectionSession.finishPreparedStatementCache();
+            
testContext.connectionSession.getDatabaseConnectionManager().removeResource(engine);
+            engine.close();
+            
testContext.connectionSession.invalidatePreparedStatementCache(createPreparedStatementCacheKey(1));
+            
testContext.connectionSession.getDatabaseConnectionManager().closeExecutionResources();
+            assertTrue(mockedDatabaseTypeRegistry.constructed().size() >= 2);
+            assertThat(mockedKernelProcessor.constructed().size(), is(1));
+        }
+        verify(connection).prepareStatement("UPDATE tbl SET col = ?");
+        verify(preparedStatement, never()).cancel();
+        verify(preparedStatement).close();
+        assertTrue(closed.get());
+    }
+    
     @Test
     void assertExecuteWithQueryResult() throws SQLException {
         SQLStatementContext sqlStatementContext = 
createSQLStatementContext(new SQLStatement(databaseType));
@@ -727,10 +925,10 @@ class StandardDatabaseProxyConnectorTest {
     }
     
     private ShardingSphereResultSetMetaData createResultSetMetaData() throws 
SQLException {
-        ShardingSphereResultSetMetaData result = 
mock(ShardingSphereResultSetMetaData.class);
-        when(result.getColumnLabel(1)).thenReturn("order_id");
-        when(result.getColumnName(1)).thenReturn("order_id");
-        return result;
+        ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
+        when(resultSetMetaData.getColumnLabel(1)).thenReturn("order_id");
+        when(resultSetMetaData.getColumnName(1)).thenReturn("order_id");
+        return new ShardingSphereResultSetMetaData(resultSetMetaData, 
mock(ShardingSphereDatabase.class), null);
     }
     
     private ResponseHeader executeWithImplicitCommitCondition(final 
SQLStatement sqlStatement, final String transactionType, final boolean 
inTransaction,
@@ -803,6 +1001,22 @@ class StandardDatabaseProxyConnectorTest {
         assertTrue(cachedStatements.isEmpty());
     }
     
+    @Test
+    void assertCloseSkipCachedPreparedStatement() throws SQLException {
+        SQLStatementContext sqlStatementContext = 
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+        
when(sqlStatementContext.getTablesContext().getDatabaseNames()).thenReturn(Collections.emptyList());
+        
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+        DatabaseProxyConnector engine = 
createDatabaseProxyConnector(JDBCDriverType.STATEMENT, 
createQueryContext(sqlStatementContext, mockDatabase()));
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        Collection<Statement> cachedStatements = getField(engine, 
"cachedStatements");
+        cachedStatements.add(preparedStatement);
+        
when(databaseConnectionManager.getConnectionSession().getPreparedStatementCacheContext().contains(preparedStatement)).thenReturn(true);
+        engine.close();
+        verify(preparedStatement, never()).cancel();
+        verify(preparedStatement, never()).close();
+        assertTrue(cachedStatements.isEmpty());
+    }
+    
     @Test
     void assertCloseResultSetsWithExceptionThrown() throws SQLException {
         SQLStatementContext sqlStatementContext = 
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
@@ -899,6 +1113,73 @@ class StandardDatabaseProxyConnectorTest {
         return result;
     }
     
+    private ConnectionSession createFirebirdConnectionSession(final 
ContextManager contextManager) {
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        ConnectionSession result = new ConnectionSession(firebirdDatabaseType, 
null);
+        result.setGrantee(new Grantee("foo_user"));
+        result.setCurrentDatabaseName("foo_db");
+        
result.getConnectionContext().getTransactionContext().beginTransaction("XA", 
null);
+        result.getTransactionStatus().setInTransaction(true);
+        return result;
+    }
+    
+    private FirebirdPreparedStatementExecutionTestContext 
createFirebirdPreparedStatementExecutionTestContext(final String sql, final 
Connection... connections) throws SQLException {
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class, 
RETURNS_DEEP_STUBS);
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        StorageUnit storageUnit = mock(StorageUnit.class);
+        DatabaseType storageDatabaseType = mock(DatabaseType.class);
+        JDBCBackendDataSource backendDataSource = 
mock(JDBCBackendDataSource.class);
+        when(metaData.containsDatabase("foo_db")).thenReturn(true);
+        when(metaData.getDatabase("foo_db")).thenReturn(database);
+        
when(metaData.getAllDatabases()).thenReturn(Collections.singleton(database));
+        
when(metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE)).thenReturn(1);
+        
when(metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY)).thenReturn(1);
+        when(metaData.getGlobalRuleMetaData()).thenReturn(new 
RuleMetaData(Collections.singletonList(sqlFederationRule)));
+        when(database.getName()).thenReturn("foo_db");
+        when(database.containsDataSource()).thenReturn(true);
+        when(database.isComplete()).thenReturn(true);
+        when(database.getProtocolType()).thenReturn(firebirdDatabaseType);
+        
when(database.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
+        
when(database.getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)).thenReturn(Collections.emptyList());
+        
when(database.getResourceMetaData().getStorageUnits()).thenReturn(Collections.singletonMap("ds",
 storageUnit));
+        when(storageDatabaseType.getType()).thenReturn("Firebird");
+        when(storageUnit.getStorageType()).thenReturn(storageDatabaseType);
+        
when(contextManager.getMetaDataContexts().getMetaData()).thenReturn(metaData);
+        when(contextManager.getDatabase("foo_db")).thenReturn(database);
+        
when(ProxyContext.getInstance().getBackendDataSource()).thenReturn(backendDataSource);
+        if (1 == connections.length) {
+            when(backendDataSource.getConnections(eq("foo_db"), eq("ds"), 
eq(1), any())).thenReturn(Collections.singletonList(connections[0]));
+        } else {
+            when(backendDataSource.getConnections(eq("foo_db"), eq("ds"), 
eq(1), any())).thenReturn(Collections.singletonList(connections[0]), 
Collections.singletonList(connections[1]));
+        }
+        ConnectionSession connectionSession = 
createFirebirdConnectionSession(contextManager);
+        SQLStatementContext sqlStatementContext = 
createSQLStatementContext(new SQLStatement(firebirdDatabaseType));
+        return new 
FirebirdPreparedStatementExecutionTestContext(connectionSession, metaData, 
sqlStatementContext,
+                new QueryContext(sqlStatementContext, sql, 
Collections.singletonList(1), new HintValueContext(), 
connectionSession.getConnectionContext(), metaData), sql);
+    }
+    
+    private ExecutionContext createFirebirdExecutionContext(final 
FirebirdPreparedStatementExecutionTestContext testContext, final int parameter) 
{
+        return new ExecutionContext(new 
QueryContext(testContext.sqlStatementContext, testContext.sql, 
Collections.singletonList(parameter),
+                new HintValueContext(), 
testContext.connectionSession.getConnectionContext(), testContext.metaData),
+                Collections.singletonList(new ExecutionUnit("ds", new 
SQLUnit(testContext.sql, Collections.singletonList(parameter)))), mock());
+    }
+    
+    private PreparedStatementCacheKey createPreparedStatementCacheKey(final 
int statementId) {
+        return new PreparedStatementCacheKey("firebird:" + statementId);
+    }
+    
+    private DatabaseProxyConnector 
createFirebirdPreparedStatementConnector(final 
FirebirdPreparedStatementExecutionTestContext testContext) throws SQLException {
+        DatabaseProxyConnector result = new StandardDatabaseProxyConnector(
+                JDBCDriverType.PREPARED_STATEMENT, testContext.queryContext, 
testContext.connectionSession.getDatabaseConnectionManager());
+        ProxySQLExecutor proxySQLExecutor = getField(result, 
"proxySQLExecutor");
+        ProxyJDBCExecutor proxyJDBCExecutor = mock(ProxyJDBCExecutor.class);
+        List<ExecuteResult> executeResults = Collections.singletonList(new 
UpdateResult(1, 0L));
+        when(proxyJDBCExecutor.execute(any(), any(), eq(false), 
anyBoolean())).thenReturn(executeResults);
+        setProxySQLExecutorField(proxySQLExecutor, "regularExecutor", 
proxyJDBCExecutor);
+        return result;
+    }
+    
     @SuppressWarnings("unchecked")
     @SneakyThrows(ReflectiveOperationException.class)
     private <T> T getField(final DatabaseProxyConnector target, final String 
fieldName) {
@@ -909,4 +1190,31 @@ class StandardDatabaseProxyConnectorTest {
     private void setField(final DatabaseProxyConnector target, final String 
fieldName, final Object value) {
         
Plugins.getMemberAccessor().set(StandardDatabaseProxyConnector.class.getDeclaredField(fieldName),
 target, value);
     }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setProxySQLExecutorField(final ProxySQLExecutor target, final 
String fieldName, final Object value) {
+        
Plugins.getMemberAccessor().set(ProxySQLExecutor.class.getDeclaredField(fieldName),
 target, value);
+    }
+    
+    private static final class FirebirdPreparedStatementExecutionTestContext {
+        
+        private final ConnectionSession connectionSession;
+        
+        private final ShardingSphereMetaData metaData;
+        
+        private final SQLStatementContext sqlStatementContext;
+        
+        private final QueryContext queryContext;
+        
+        private final String sql;
+        
+        private FirebirdPreparedStatementExecutionTestContext(final 
ConnectionSession connectionSession, final ShardingSphereMetaData metaData,
+                                                              final 
SQLStatementContext sqlStatementContext, final QueryContext queryContext, final 
String sql) {
+            this.connectionSession = connectionSession;
+            this.metaData = metaData;
+            this.sqlStatementContext = sqlStatementContext;
+            this.queryContext = queryContext;
+            this.sql = sql;
+        }
+    }
 }
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
index dbc5c999342..083d425c612 100644
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatementTest.java
@@ -25,10 +25,16 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheContext;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -38,10 +44,12 @@ import java.sql.Types;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.stream.Stream;
 
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
@@ -54,7 +62,8 @@ class JDBCBackendStatementTest {
     
     @Test
     void 
assertCreateStorageResourceWithStatementSetsFetchSizeOnlyInMemoryStrictly() 
throws SQLException {
-        JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
         Connection connection = mock(Connection.class);
         Statement statement = mock(Statement.class);
         when(connection.createStatement()).thenReturn(statement);
@@ -70,7 +79,9 @@ class JDBCBackendStatementTest {
     
     @Test
     void assertCreateStorageResourceWithPreparedStatement() throws 
SQLException {
-        JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.empty());
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
         Connection connection = mock(Connection.class);
         PreparedStatement preparedStatementWithGeneratedKeys = 
mock(PreparedStatement.class);
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
@@ -90,4 +101,95 @@ class JDBCBackendStatementTest {
                 new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.emptyList())), connection, 0, ConnectionMode.CONNECTION_STRICTLY, 
new StatementOption(false), databaseType);
         assertThat(actualWithoutGeneratedKeys, is(preparedStatement));
     }
+    
+    @Test
+    void 
assertCreateStorageResourceReusesPreparedStatementForSamePreparedStatementCacheKey()
 throws SQLException {
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.of(new
 PreparedStatementCacheKey("statement-1")));
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        String sql = "SELECT * FROM foo WHERE id = ?";
+        when(connection.prepareStatement(sql)).thenReturn(preparedStatement);
+        StatementOption statementOption = new StatementOption(false);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(1))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(2))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+        verify(connection).prepareStatement(sql);
+        verify(preparedStatement, times(2)).clearParameters();
+        verify(preparedStatement).setObject(1, 1);
+        verify(preparedStatement).setObject(1, 2);
+    }
+    
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("nonFirebirdDatabaseTypes")
+    void 
assertCreateStorageResourceWithoutPreparedStatementCacheWithoutActiveKey(final 
String scenario, final String databaseTypeName) throws SQLException {
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.empty());
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        String sql = "SELECT * FROM foo WHERE id = ?";
+        DatabaseType nonFirebirdDatabaseType = mock(DatabaseType.class);
+        when(connection.prepareStatement(sql)).thenReturn(preparedStatement);
+        StatementOption statementOption = new StatementOption(false);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(1))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, nonFirebirdDatabaseType);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(2))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, nonFirebirdDatabaseType);
+        verify(connection, times(2)).prepareStatement(sql);
+    }
+    
+    @Test
+    void assertCreateStorageResourceWithDifferentPreparedStatementCacheKey() 
throws SQLException {
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        String sql = "SELECT * FROM foo WHERE id = ?";
+        when(connection.prepareStatement(sql)).thenReturn(preparedStatement);
+        StatementOption statementOption = new StatementOption(false);
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(
+                Optional.of(new PreparedStatementCacheKey("statement-1")), 
Optional.of(new PreparedStatementCacheKey("statement-2")));
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(1))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(2))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+        verify(connection, times(2)).prepareStatement(sql);
+    }
+    
+    @Test
+    void assertCreateNewPreparedStatementAfterCacheInvalidation() throws 
SQLException {
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        PreparedStatementCacheKey preparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-1");
+        
when(connectionSession.getPreparedStatementCacheContext()).thenReturn(cacheContext);
+        
when(connectionSession.getCurrentPreparedStatementCacheKey()).thenReturn(Optional.of(preparedStatementCacheKey));
+        JDBCBackendStatement backendStatement = new 
JDBCBackendStatement(connectionSession);
+        Connection connection = mock(Connection.class);
+        PreparedStatement firstPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement secondPreparedStatement = 
mock(PreparedStatement.class);
+        String sql = "SELECT * FROM foo WHERE id = ?";
+        
when(connection.prepareStatement(sql)).thenReturn(firstPreparedStatement, 
secondPreparedStatement);
+        StatementOption statementOption = new StatementOption(false);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(1))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+        cacheContext.invalidate(preparedStatementCacheKey);
+        backendStatement.createStorageResource(
+                new ExecutionUnit("ds", new SQLUnit(sql, 
Collections.singletonList(2))), connection, 0, 
ConnectionMode.CONNECTION_STRICTLY, statementOption, databaseType);
+        verify(connection, times(2)).prepareStatement(sql);
+        verify(firstPreparedStatement).close();
+    }
+    
+    private static Stream<Arguments> nonFirebirdDatabaseTypes() {
+        return Stream.of(
+                
Arguments.of("createStorageResource_mysqlDoesNotUsePreparedStatementCache", 
"MySQL"),
+                
Arguments.of("createStorageResource_postgresqlDoesNotUsePreparedStatementCache",
 "PostgreSQL"),
+                
Arguments.of("createStorageResource_openGaussDoesNotUsePreparedStatementCache", 
"openGauss"));
+    }
 }
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
index 4a9fb7bc15f..6f52a4613ea 100644
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java
@@ -36,7 +36,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.internal.configuration.plugins.Plugins;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.is;
@@ -164,6 +168,26 @@ class ConnectionSessionTest {
         assertNull(connectionSession.getQueryContext());
     }
     
+    @Test
+    void assertPreparedStatementCacheContext() {
+        assertNotNull(connectionSession.getPreparedStatementCacheContext());
+    }
+    
+    @Test
+    void assertPreparedStatementCacheLifecycle() throws SQLException {
+        PreparedStatementCacheKey preparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-1");
+        
connectionSession.beginPreparedStatementCache(preparedStatementCacheKey);
+        assertThat(connectionSession.getCurrentPreparedStatementCacheKey(), 
is(Optional.of(preparedStatementCacheKey)));
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        
connectionSession.getPreparedStatementCacheContext().getOrCreate(connection, 
"SELECT 1", false, preparedStatementCacheKey, () -> preparedStatement);
+        
connectionSession.invalidatePreparedStatementCache(preparedStatementCacheKey);
+        
assertFalse(connectionSession.getPreparedStatementCacheContext().contains(preparedStatement));
+        connectionSession.finishPreparedStatementCache();
+        assertThat(connectionSession.getCurrentPreparedStatementCacheKey(), 
is(Optional.empty()));
+        verify(preparedStatement).close();
+    }
+    
     @SuppressWarnings("unchecked")
     private AtomicReference<ConnectionContext> getConnectionContextReference() 
throws ReflectiveOperationException {
         return (AtomicReference<ConnectionContext>) 
Plugins.getMemberAccessor().get(ConnectionSession.class.getDeclaredField("connectionContext"),
 connectionSession);
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContextTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContextTest.java
new file mode 100644
index 00000000000..219ff9f1aae
--- /dev/null
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheContextTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class PreparedStatementCacheContextTest {
+    
+    @Test
+    void assertGetOrCreateWithSamePreparedStatementCacheKey() throws 
SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        PreparedStatementCacheKey preparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-1");
+        int[] invokedCount = {0};
+        PreparedStatement actualFirst = cacheContext.getOrCreate(connection, 
"SELECT 1", false, preparedStatementCacheKey, () -> {
+            invokedCount[0]++;
+            return preparedStatement;
+        });
+        PreparedStatement actualSecond = cacheContext.getOrCreate(connection, 
"SELECT 1", false, preparedStatementCacheKey, () -> {
+            invokedCount[0]++;
+            return mock(PreparedStatement.class);
+        });
+        assertThat(actualFirst, is(preparedStatement));
+        assertThat(actualSecond, is(preparedStatement));
+        assertThat(invokedCount[0], is(1));
+        assertTrue(cacheContext.contains(preparedStatement));
+    }
+    
+    @Test
+    void assertGetOrCreateWithDifferentPreparedStatementCacheKey() throws 
SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        Connection connection = mock(Connection.class);
+        PreparedStatement firstPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement secondPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement actualFirst = cacheContext.getOrCreate(connection, 
"SELECT 1", false, new PreparedStatementCacheKey("statement-1"), () -> 
firstPreparedStatement);
+        PreparedStatement actualSecond = cacheContext.getOrCreate(connection, 
"SELECT 1", false, new PreparedStatementCacheKey("statement-2"), () -> 
secondPreparedStatement);
+        assertThat(actualFirst, is(firstPreparedStatement));
+        assertThat(actualSecond, is(secondPreparedStatement));
+    }
+    
+    @Test
+    void assertEvictWithLRU() throws SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(2);
+        Connection connection = mock(Connection.class);
+        PreparedStatement first = mock(PreparedStatement.class);
+        PreparedStatement second = mock(PreparedStatement.class);
+        PreparedStatement third = mock(PreparedStatement.class);
+        cacheContext.getOrCreate(connection, "SELECT 1", false, new 
PreparedStatementCacheKey("statement-1"), () -> first);
+        cacheContext.getOrCreate(connection, "SELECT 2", false, new 
PreparedStatementCacheKey("statement-2"), () -> second);
+        cacheContext.getOrCreate(connection, "SELECT 1", false, new 
PreparedStatementCacheKey("statement-1"), () -> first);
+        cacheContext.getOrCreate(connection, "SELECT 3", false, new 
PreparedStatementCacheKey("statement-3"), () -> third);
+        assertFalse(cacheContext.contains(second));
+        assertTrue(cacheContext.contains(first));
+        assertTrue(cacheContext.contains(third));
+        verify(second).close();
+    }
+    
+    @Test
+    void assertInvalidate() throws SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        Connection connection = mock(Connection.class);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        cacheContext.getOrCreate(connection, "SELECT 1", false, new 
PreparedStatementCacheKey("statement-1"), () -> preparedStatement);
+        cacheContext.invalidate(preparedStatement);
+        assertFalse(cacheContext.contains(preparedStatement));
+        verify(preparedStatement).close();
+    }
+    
+    @Test
+    void assertInvalidateByPreparedStatementCacheKey() throws SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        Connection connection = mock(Connection.class);
+        PreparedStatement first = mock(PreparedStatement.class);
+        PreparedStatement second = mock(PreparedStatement.class);
+        PreparedStatementCacheKey firstPreparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-1");
+        PreparedStatementCacheKey secondPreparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-2");
+        cacheContext.getOrCreate(connection, "SELECT 1", false, 
firstPreparedStatementCacheKey, () -> first);
+        cacheContext.getOrCreate(connection, "SELECT 1", false, 
secondPreparedStatementCacheKey, () -> second);
+        cacheContext.invalidate(firstPreparedStatementCacheKey);
+        assertFalse(cacheContext.contains(first));
+        assertTrue(cacheContext.contains(second));
+        verify(first).close();
+    }
+    
+    @Test
+    void assertGetOrCreateWithClosedPreparedStatement() throws SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        Connection connection = mock(Connection.class);
+        PreparedStatement firstPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement secondPreparedStatement = 
mock(PreparedStatement.class);
+        when(firstPreparedStatement.isClosed()).thenReturn(true);
+        PreparedStatementCacheKey preparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-1");
+        cacheContext.getOrCreate(connection, "SELECT 1", false, 
preparedStatementCacheKey, () -> firstPreparedStatement);
+        PreparedStatement actualPreparedStatement = 
cacheContext.getOrCreate(connection, "SELECT 1", false, 
preparedStatementCacheKey, () -> secondPreparedStatement);
+        assertThat(actualPreparedStatement, is(secondPreparedStatement));
+        verify(firstPreparedStatement).close();
+    }
+    
+    @Test
+    void assertCloseAll() throws SQLException {
+        PreparedStatementCacheContext cacheContext = new 
PreparedStatementCacheContext(8);
+        Connection connection = mock(Connection.class);
+        PreparedStatement first = mock(PreparedStatement.class);
+        PreparedStatement second = mock(PreparedStatement.class);
+        cacheContext.getOrCreate(connection, "SELECT 1", false, new 
PreparedStatementCacheKey("statement-1"), () -> first);
+        cacheContext.getOrCreate(connection, "SELECT 2", false, new 
PreparedStatementCacheKey("statement-2"), () -> second);
+        cacheContext.closeAll();
+        assertThat(cacheContext.size(), is(0));
+        verify(first).close();
+        verify(second).close();
+    }
+}
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKeyTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKeyTest.java
new file mode 100644
index 00000000000..302876f2b8a
--- /dev/null
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementCacheKeyTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PreparedStatementCacheKeyTest {
+    
+    @Test
+    void assertGetToken() {
+        PreparedStatementCacheKey preparedStatementCacheKey = new 
PreparedStatementCacheKey("statement-1");
+        assertThat(preparedStatementCacheKey.getToken(), is("statement-1"));
+        assertThat(preparedStatementCacheKey, is(new 
PreparedStatementCacheKey("statement-1")));
+    }
+    
+    @Test
+    void assertConstructorWithNullToken() {
+        NullPointerException actual = assertThrows(NullPointerException.class, 
() -> new PreparedStatementCacheKey(null));
+        assertThat(actual.getMessage(), is("token cannot be null."));
+    }
+}
diff --git 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleaner.java
 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleaner.java
new file mode 100644
index 00000000000..917bdbbf413
--- /dev/null
+++ 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleaner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementCacheKey;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
+
+import java.sql.SQLException;
+
+/**
+ * Firebird statement resource cleaner.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class FirebirdStatementResourceCleaner {
+    
+    private static final String PREPARED_STATEMENT_CACHE_KEY_PREFIX = 
"firebird:";
+    
+    /**
+     * Create prepared statement cache key.
+     *
+     * @param statementId statement ID
+     * @return prepared statement cache key
+     */
+    public static PreparedStatementCacheKey 
createPreparedStatementCacheKey(final int statementId) {
+        return new 
PreparedStatementCacheKey(PREPARED_STATEMENT_CACHE_KEY_PREFIX + statementId);
+    }
+    
+    /**
+     * Clean Firebird statement resources.
+     *
+     * @param connectionSession connection session
+     * @param statementId statement ID
+     * @param invalidatePreparedStatementCache whether invalidate prepared 
statement cache
+     * @throws SQLException SQL exception
+     */
+    public static void clean(final ConnectionSession connectionSession, final 
int statementId, final boolean invalidatePreparedStatementCache) throws 
SQLException {
+        ProxyBackendHandler proxyBackendHandler = 
FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(connectionSession.getConnectionId(),
 statementId);
+        if (null != proxyBackendHandler) {
+            
connectionSession.getDatabaseConnectionManager().removeResource(proxyBackendHandler);
+            
FirebirdFetchStatementCache.getInstance().unregisterStatement(connectionSession.getConnectionId(),
 statementId);
+            connectionSession.getConnectionContext().clearCursorContext();
+            try {
+                proxyBackendHandler.close();
+            } finally {
+                invalidatePreparedStatementCacheIfNecessary(connectionSession, 
statementId, invalidatePreparedStatementCache);
+            }
+            return;
+        }
+        
FirebirdFetchStatementCache.getInstance().unregisterStatement(connectionSession.getConnectionId(),
 statementId);
+        connectionSession.getConnectionContext().clearCursorContext();
+        invalidatePreparedStatementCacheIfNecessary(connectionSession, 
statementId, invalidatePreparedStatementCache);
+    }
+    
+    private static void invalidatePreparedStatementCacheIfNecessary(final 
ConnectionSession connectionSession, final int statementId,
+                                                                    final 
boolean invalidatePreparedStatementCache) {
+        if (invalidatePreparedStatementCache) {
+            
connectionSession.invalidatePreparedStatementCache(createPreparedStatementCacheKey(statementId));
+        }
+    }
+}
diff --git 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
index 21b7410dbbd..890b941b1c6 100644
--- 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
+++ 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutor.java
@@ -44,6 +44,7 @@ import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.upload.FirebirdBlobUploadCache;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
 
 import java.sql.SQLException;
@@ -70,9 +71,30 @@ public final class FirebirdExecuteStatementCommandExecutor 
implements CommandExe
     
     @Override
     public Collection<DatabasePacket> execute() throws SQLException {
-        FirebirdServerPreparedStatement preparedStatement = 
connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
-        List<Object> params = packet.getParameterValues();
-        final List<Long> blobIdsToRemove = bindBlobParameters(params);
+        
connectionSession.beginPreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(packet.getStatementId()));
+        try {
+            FirebirdServerPreparedStatement preparedStatement = 
connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
+            ResponseHeader responseHeader = 
executePreparedStatement(preparedStatement, packet.getParameterValues());
+            if (responseHeader instanceof QueryResponseHeader) {
+                responseType = ResponseType.QUERY;
+                
FirebirdFetchStatementCache.getInstance().registerStatement(connectionSession.getConnectionId(),
 packet.getStatementId(), proxyBackendHandler);
+                
connectionSession.getDatabaseConnectionManager().markResourceInUse(proxyBackendHandler);
+            } else {
+                responseType = ResponseType.UPDATE;
+            }
+            Collection<DatabasePacket> result = new LinkedList<>();
+            if (packet.isStoredProcedure() && proxyBackendHandler.next()) {
+                result.add(getSQLResponse());
+            }
+            result.add(new FirebirdGenericResponsePacket());
+            return result;
+        } finally {
+            connectionSession.finishPreparedStatementCache();
+        }
+    }
+    
+    private ResponseHeader executePreparedStatement(final 
FirebirdServerPreparedStatement preparedStatement, final List<Object> params) 
throws SQLException {
+        List<Long> blobIdsToRemove = bindBlobParameters(params);
         SQLStatementContext sqlStatementContext = 
preparedStatement.getSqlStatementContext();
         if (sqlStatementContext instanceof ParameterAware) {
             ((ParameterAware) sqlStatementContext).bindParameters(params);
@@ -80,22 +102,10 @@ public final class FirebirdExecuteStatementCommandExecutor 
implements CommandExe
         QueryContext queryContext = new QueryContext(sqlStatementContext, 
preparedStatement.getSql(), params, preparedStatement.getHintValueContext(), 
connectionSession.getConnectionContext(),
                 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(),
 true);
         proxyBackendHandler = 
ProxyBackendHandlerFactory.newInstance(TypedSPILoader.getService(DatabaseType.class,
 "Firebird"), queryContext, connectionSession, true);
-        ResponseHeader responseHeader = proxyBackendHandler.execute();
-        if (responseHeader instanceof QueryResponseHeader) {
-            responseType = ResponseType.QUERY;
-            
FirebirdFetchStatementCache.getInstance().registerStatement(connectionSession.getConnectionId(),
 packet.getStatementId(), proxyBackendHandler);
-            
connectionSession.getDatabaseConnectionManager().markResourceInUse(proxyBackendHandler);
-        } else {
-            responseType = ResponseType.UPDATE;
-        }
-        if (responseHeader instanceof UpdateResponseHeader) {
+        ResponseHeader result = proxyBackendHandler.execute();
+        if (result instanceof UpdateResponseHeader) {
             clearBlobUploads(blobIdsToRemove);
         }
-        Collection<DatabasePacket> result = new LinkedList<>();
-        if (packet.isStoredProcedure() && proxyBackendHandler.next()) {
-            result.add(getSQLResponse());
-        }
-        result.add(new FirebirdGenericResponsePacket());
         return result;
     }
     
diff --git 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
index 8f3e3320c48..3f62a03f073 100644
--- 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
+++ 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCache.java
@@ -61,7 +61,10 @@ public final class FirebirdFetchStatementCache {
      * @param proxyBackendHandler proxy backend handler
      */
     public void registerStatement(final int connectionId, final int 
statementId, final ProxyBackendHandler proxyBackendHandler) {
-        statementRegistry.get(connectionId).put(statementId, 
proxyBackendHandler);
+        Map<Integer, ProxyBackendHandler> statements = 
statementRegistry.get(connectionId);
+        if (null != statements) {
+            statements.put(statementId, proxyBackendHandler);
+        }
     }
     
     /**
@@ -72,7 +75,8 @@ public final class FirebirdFetchStatementCache {
      * @return fetch response packets
      */
     public ProxyBackendHandler getFetchBackendHandler(final int connectionId, 
final int statementId) {
-        return statementRegistry.get(connectionId).get(statementId);
+        Map<Integer, ProxyBackendHandler> statements = 
statementRegistry.get(connectionId);
+        return null == statements ? null : statements.get(statementId);
     }
     
     /**
@@ -82,7 +86,10 @@ public final class FirebirdFetchStatementCache {
      * @param statementId statement ID
      */
     public void unregisterStatement(final int connectionId, final int 
statementId) {
-        statementRegistry.get(connectionId).remove(statementId);
+        Map<Integer, ProxyBackendHandler> statements = 
statementRegistry.get(connectionId);
+        if (null != statements) {
+            statements.remove(statementId);
+        }
     }
     
     /**
diff --git 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
index 3d050685603..0c0eeed85ed 100644
--- 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
+++ 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutor.java
@@ -22,11 +22,11 @@ import 
org.apache.shardingsphere.database.protocol.firebird.exception.FirebirdPr
 import 
org.apache.shardingsphere.database.protocol.firebird.packet.command.query.statement.FirebirdFreeStatementPacket;
 import 
org.apache.shardingsphere.database.protocol.firebird.packet.generic.FirebirdGenericResponsePacket;
 import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
 
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -41,17 +41,15 @@ public final class FirebirdFreeStatementCommandExecutor 
implements CommandExecut
     private final ConnectionSession connectionSession;
     
     @Override
-    public Collection<DatabasePacket> execute() {
+    public Collection<DatabasePacket> execute() throws SQLException {
         switch (packet.getOption()) {
             case FirebirdFreeStatementPacket.DROP:
             case FirebirdFreeStatementPacket.UNPREPARE:
                 
connectionSession.getServerPreparedStatementRegistry().removePreparedStatement(packet.getStatementId());
+                FirebirdStatementResourceCleaner.clean(connectionSession, 
packet.getStatementId(), true);
                 break;
             case FirebirdFreeStatementPacket.CLOSE:
-                connectionSession.getConnectionContext().clearCursorContext();
-                ProxyBackendHandler proxyBackendHandler = 
FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(connectionSession.getConnectionId(),
 packet.getStatementId());
-                
connectionSession.getDatabaseConnectionManager().unmarkResourceInUse(proxyBackendHandler);
-                
FirebirdFetchStatementCache.getInstance().unregisterStatement(connectionSession.getConnectionId(),
 packet.getStatementId());
+                FirebirdStatementResourceCleaner.clean(connectionSession, 
packet.getStatementId(), false);
                 break;
             default:
                 throw new FirebirdProtocolException("Unknown DSQL option type 
%d", packet.getOption());
diff --git 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
index 6fb4cf809cb..0d98b257ed2 100644
--- 
a/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
+++ 
b/proxy/frontend/dialect/firebird/src/main/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutor.java
@@ -57,6 +57,7 @@ import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.metadata.FirebirdBlobColumnMetaData;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.metadata.FirebirdBlobColumnMetaDataResolver;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementIdGenerator;
 import 
org.apache.shardingsphere.sql.parser.statement.core.enums.TableSourceType;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.ReturningSegment;
@@ -85,6 +86,7 @@ import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.Ro
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.SavepointStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
 
+import java.sql.SQLException;
 import java.sql.Types;
 import java.util.Collection;
 import java.util.Collections;
@@ -106,15 +108,19 @@ public final class 
FirebirdPrepareStatementCommandExecutor implements CommandExe
     private ReturningSegment returningSegment;
     
     @Override
-    public Collection<DatabasePacket> execute() {
+    public Collection<DatabasePacket> execute() throws SQLException {
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLParserRule sqlParserRule = 
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "Firebird");
         SQLStatement sqlStatement = 
sqlParserRule.getSQLParserEngine(databaseType).parse(packet.getSQL(), true);
         SQLStatementContext sqlStatementContext = new SQLBindEngine(
                 metaDataContexts.getMetaData(), 
connectionSession.getCurrentDatabaseName(), 
packet.getHintValueContext()).bind(sqlStatement);
+        int statementId = getStatementId();
+        if (packet.isValidStatementHandle()) {
+            FirebirdStatementResourceCleaner.clean(connectionSession, 
statementId, true);
+        }
         FirebirdServerPreparedStatement serverPreparedStatement = new 
FirebirdServerPreparedStatement(packet.getSQL(), sqlStatementContext, 
packet.getHintValueContext());
-        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(getStatementId(),
 serverPreparedStatement);
+        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(statementId,
 serverPreparedStatement);
         return createResponse(sqlStatementContext, metaDataContexts);
     }
     
diff --git 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleanerTest.java
 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleanerTest.java
new file mode 100644
index 00000000000..c3d26827a41
--- /dev/null
+++ 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/FirebirdStatementResourceCleanerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
+import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings(ProxyContext.class)
+class FirebirdStatementResourceCleanerTest {
+    
+    private static final int CONNECTION_ID = 1;
+    
+    private static final int STATEMENT_ID = 2;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ConnectionSession connectionSession;
+    
+    @Mock
+    private ProxyDatabaseConnectionManager connectionManager;
+    
+    @Mock
+    private ConnectionContext connectionContext;
+    
+    @Mock
+    private ProxyBackendHandler proxyBackendHandler;
+    
+    @AfterEach
+    void tearDown() {
+        
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID, 
STATEMENT_ID);
+        
FirebirdFetchStatementCache.getInstance().unregisterConnection(CONNECTION_ID);
+    }
+    
+    @Test
+    void assertCreatePreparedStatementCacheKey() {
+        
assertThat(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID).getToken(),
 is("firebird:2"));
+    }
+    
+    @Test
+    void assertCleanWithPreparedStatementCacheInvalidation() throws 
SQLException {
+        
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
+        
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID, 
STATEMENT_ID, proxyBackendHandler);
+        when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+        
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
+        
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+        FirebirdStatementResourceCleaner.clean(connectionSession, 
STATEMENT_ID, true);
+        InOrder inOrder = inOrder(connectionManager, connectionContext, 
proxyBackendHandler, connectionSession);
+        inOrder.verify(connectionManager).removeResource(proxyBackendHandler);
+        inOrder.verify(connectionContext).clearCursorContext();
+        inOrder.verify(proxyBackendHandler).close();
+        
inOrder.verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+        
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID));
+    }
+    
+    @Test
+    void 
assertCleanWithPreparedStatementCacheInvalidationWithoutFetchHandler() throws 
SQLException {
+        when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+        
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+        FirebirdStatementResourceCleaner.clean(connectionSession, 
STATEMENT_ID, true);
+        verify(connectionContext).clearCursorContext();
+        
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+        verifyNoInteractions(connectionManager, proxyBackendHandler);
+    }
+    
+    @Test
+    void assertCleanWithoutPreparedStatementCacheInvalidation() throws 
SQLException {
+        
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
+        
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID, 
STATEMENT_ID, proxyBackendHandler);
+        when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+        
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
+        
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+        FirebirdStatementResourceCleaner.clean(connectionSession, 
STATEMENT_ID, false);
+        verify(connectionManager).removeResource(proxyBackendHandler);
+        verify(proxyBackendHandler).close();
+        verify(connectionContext).clearCursorContext();
+        verify(connectionSession, 
never()).invalidatePreparedStatementCache(any());
+        
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID));
+    }
+    
+    @Test
+    void assertCleanBeforeCloseExecutionResources() throws SQLException, 
BackendConnectionException {
+        ConnectionSession actualConnectionSession = 
mock(ConnectionSession.class, Answers.RETURNS_DEEP_STUBS);
+        ConnectionContext actualConnectionContext = 
mock(ConnectionContext.class, Answers.RETURNS_DEEP_STUBS);
+        ContextManager contextManager = mock(ContextManager.class, 
Answers.RETURNS_DEEP_STUBS);
+        ProxyBackendHandler actualProxyBackendHandler = 
mock(ProxyBackendHandler.class);
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        
when(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules()).thenReturn(Collections.emptyList());
+        
when(actualConnectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+        
when(actualConnectionSession.getConnectionContext()).thenReturn(actualConnectionContext);
+        
when(actualConnectionContext.getTransactionContext().getTransactionType()).thenReturn(Optional.of("XA"));
+        
when(actualConnectionSession.getTransactionStatus().isInConnectionHeldTransaction(any())).thenReturn(true);
+        ProxyDatabaseConnectionManager actualConnectionManager = new 
ProxyDatabaseConnectionManager(actualConnectionSession);
+        
when(actualConnectionSession.getDatabaseConnectionManager()).thenReturn(actualConnectionManager);
+        
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
+        
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID, 
STATEMENT_ID, actualProxyBackendHandler);
+        actualConnectionManager.add(actualProxyBackendHandler);
+        actualConnectionManager.markResourceInUse(actualProxyBackendHandler);
+        final AtomicBoolean closed = new AtomicBoolean(false);
+        org.mockito.Mockito.doAnswer(invocation -> {
+            if (closed.getAndSet(true)) {
+                throw new SQLException("close twice");
+            }
+            return null;
+        }).when(actualProxyBackendHandler).close();
+        FirebirdStatementResourceCleaner.clean(actualConnectionSession, 
STATEMENT_ID, true);
+        actualConnectionManager.closeExecutionResources();
+        verify(actualProxyBackendHandler, times(1)).close();
+        
verify(actualConnectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+        
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID));
+    }
+}
diff --git 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
index d12ce19863e..fc408937cd4 100644
--- 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
+++ 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/execute/FirebirdExecuteStatementCommandExecutorTest.java
@@ -46,6 +46,7 @@ import 
org.apache.shardingsphere.proxy.backend.session.ServerPreparedStatementRe
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.blob.upload.FirebirdBlobUploadCache;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
@@ -77,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(AutoMockExtension.class)
@@ -156,6 +158,8 @@ class FirebirdExecuteStatementCommandExecutorTest {
         assertThat(iterator.next(), isA(FirebirdGenericResponsePacket.class));
         assertFalse(iterator.hasNext());
         
assertThat(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID), is(proxyBackendHandler));
+        
verify(connectionSession).beginPreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(1));
+        verify(connectionSession).finishPreparedStatementCache();
     }
     
     @Test
@@ -170,6 +174,8 @@ class FirebirdExecuteStatementCommandExecutorTest {
         assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
         assertThat(actual.iterator().next(), 
isA(FirebirdGenericResponsePacket.class));
         
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID));
+        
verify(connectionSession).beginPreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(2));
+        verify(connectionSession).finishPreparedStatementCache();
     }
     
     @Test
diff --git 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
index 31724987f81..3989ed7b92a 100644
--- 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
+++ 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCacheTest.java
@@ -72,6 +72,11 @@ class FirebirdFetchStatementCacheTest {
         assertNull(cache.getFetchBackendHandler(1, 11));
     }
     
+    @Test
+    void assertGetFetchBackendHandlerWithoutConnection() {
+        assertNull(cache.getFetchBackendHandler(1, 10));
+    }
+    
     @Test
     void assertUnregisterStatement() {
         cache.registerConnection(1);
@@ -80,6 +85,12 @@ class FirebirdFetchStatementCacheTest {
         assertNull(cache.getFetchBackendHandler(1, 10));
     }
     
+    @Test
+    void assertUnregisterStatementWithoutConnection() {
+        cache.unregisterStatement(1, 10);
+        assertFalse(statementRegistry.containsKey(1));
+    }
+    
     @Test
     void assertUnregisterConnection() {
         cache.registerConnection(1);
diff --git 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
index c0bc6c15dea..4f3902edd64 100644
--- 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
+++ 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/fetch/FirebirdFetchStatementCommandExecutorTest.java
@@ -84,13 +84,15 @@ class FirebirdFetchStatementCommandExecutorTest {
     @Test
     void assertExecuteWhenNoBackendHandler() throws SQLException {
         executor = new FirebirdFetchStatementCommandExecutor(packet, 
connectionSession);
-        Collection<DatabasePacket> actualPackets = executor.execute();
-        Iterator<DatabasePacket> packetIterator = actualPackets.iterator();
-        FirebirdFetchResponsePacket actualPacket = 
(FirebirdFetchResponsePacket) packetIterator.next();
-        assertThat(actualPackets.size(), is(1));
-        assertThat(actualPacket.getStatus(), 
is(ISCConstants.FETCH_NO_MORE_ROWS));
-        assertThat(actualPacket.getCount(), is(0));
-        assertNull(actualPacket.getRow());
+        assertNoMoreRowsResponse(executor.execute());
+    }
+    
+    @Test
+    void assertExecuteWhenNoBackendHandlerAfterSameHandleReprepare() throws 
SQLException {
+        
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID, 
STATEMENT_ID, proxyBackendHandler);
+        
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID, 
STATEMENT_ID);
+        executor = new FirebirdFetchStatementCommandExecutor(packet, 
connectionSession);
+        assertNoMoreRowsResponse(executor.execute());
     }
     
     @Test
@@ -139,4 +141,13 @@ class FirebirdFetchStatementCommandExecutorTest {
         assertNull(actualNoMorePacket.getRow());
         
verify(databaseConnectionManager).unmarkResourceInUse(proxyBackendHandler);
     }
+    
+    private void assertNoMoreRowsResponse(final Collection<DatabasePacket> 
actualPackets) {
+        Iterator<DatabasePacket> packetIterator = actualPackets.iterator();
+        FirebirdFetchResponsePacket actualPacket = 
(FirebirdFetchResponsePacket) packetIterator.next();
+        assertThat(actualPackets.size(), is(1));
+        assertThat(actualPacket.getStatus(), 
is(ISCConstants.FETCH_NO_MORE_ROWS));
+        assertThat(actualPacket.getCount(), is(0));
+        assertNull(actualPacket.getRow());
+    }
 }
diff --git 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
index c1baea6e381..0fa8451bff4 100644
--- 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
+++ 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/free/FirebirdFreeStatementCommandExecutorTest.java
@@ -21,15 +21,18 @@ import 
org.apache.shardingsphere.database.protocol.firebird.exception.FirebirdPr
 import 
org.apache.shardingsphere.database.protocol.firebird.packet.command.query.statement.FirebirdFreeStatementPacket;
 import 
org.apache.shardingsphere.database.protocol.firebird.packet.generic.FirebirdGenericResponsePacket;
 import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
-import 
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.session.ServerPreparedStatementRegistry;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Answers;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -37,11 +40,14 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.util.Collection;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -62,9 +68,6 @@ class FirebirdFreeStatementCommandExecutorTest {
     @Mock
     private ServerPreparedStatementRegistry registry;
     
-    @Mock
-    private ProxyDatabaseConnectionManager connectionManager;
-    
     @Mock
     private ProxyBackendHandler proxyBackendHandler;
     
@@ -75,7 +78,6 @@ class FirebirdFreeStatementCommandExecutorTest {
         when(packet.getStatementId()).thenReturn(STATEMENT_ID);
         when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(registry);
-        
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
     }
     
     @AfterEach
@@ -84,36 +86,45 @@ class FirebirdFreeStatementCommandExecutorTest {
         
FirebirdFetchStatementCache.getInstance().unregisterConnection(CONNECTION_ID);
     }
     
-    @Test
-    void assertExecuteWithDrop() {
-        when(packet.getOption()).thenReturn(FirebirdFreeStatementPacket.DROP);
-        FirebirdFreeStatementCommandExecutor executor = new 
FirebirdFreeStatementCommandExecutor(packet, connectionSession);
-        Collection<DatabasePacket> actual = executor.execute();
-        assertThat(actual.iterator().next(), 
isA(FirebirdGenericResponsePacket.class));
-        verify(registry).removePreparedStatement(1);
-    }
-    
-    @Test
-    void assertExecuteWithUnprepare() {
-        
when(packet.getOption()).thenReturn(FirebirdFreeStatementPacket.UNPREPARE);
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("preparedStatementFreeOptions")
+    void assertExecuteWithPreparedStatementFreeOption(final String scenario, 
final int option) throws Exception {
+        when(packet.getOption()).thenReturn(option);
         FirebirdFreeStatementCommandExecutor executor = new 
FirebirdFreeStatementCommandExecutor(packet, connectionSession);
         Collection<DatabasePacket> actual = executor.execute();
         assertThat(actual.iterator().next(), 
isA(FirebirdGenericResponsePacket.class));
-        verify(registry).removePreparedStatement(1);
+        verify(registry).removePreparedStatement(STATEMENT_ID);
+        
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+        
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID));
     }
     
     @Test
-    void assertExecuteWithClose() {
+    void assertExecuteWithClose() throws Exception {
         when(packet.getOption()).thenReturn(FirebirdFreeStatementPacket.CLOSE);
         new FirebirdFreeStatementCommandExecutor(packet, 
connectionSession).execute();
-        verify(connectionSession.getConnectionContext()).clearCursorContext();
-        verify(connectionManager).unmarkResourceInUse(proxyBackendHandler);
+        verify(connectionSession, 
never()).invalidatePreparedStatementCache(any());
         
assertNull(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 STATEMENT_ID));
     }
     
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("preparedStatementFreeOptions")
+    void assertExecuteWithPreparedStatementFreeOptionWithoutFetchHandler(final 
String scenario, final int option) throws Exception {
+        
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID, 
STATEMENT_ID);
+        when(packet.getOption()).thenReturn(option);
+        new FirebirdFreeStatementCommandExecutor(packet, 
connectionSession).execute();
+        verify(registry).removePreparedStatement(STATEMENT_ID);
+        
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(STATEMENT_ID));
+    }
+    
     @Test
     void assertExecuteWithUnknownOption() {
         when(packet.getOption()).thenReturn(999);
         assertThrows(FirebirdProtocolException.class, new 
FirebirdFreeStatementCommandExecutor(packet, connectionSession)::execute);
     }
+    
+    private static Stream<Arguments> preparedStatementFreeOptions() {
+        return Stream.of(
+                
Arguments.of("execute_dropCleansPreparedStatementAndFetchResources", 
FirebirdFreeStatementPacket.DROP),
+                
Arguments.of("execute_unprepareCleansPreparedStatementAndFetchResources", 
FirebirdFreeStatementPacket.UNPREPARE));
+    }
 }
diff --git 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
index d950a6a3be0..99ef0734938 100644
--- 
a/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
+++ 
b/proxy/frontend/dialect/firebird/src/test/java/org/apache/shardingsphere/proxy/frontend/firebird/command/query/statement/prepare/FirebirdPrepareStatementCommandExecutorTest.java
@@ -45,13 +45,18 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.session.ServerPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
 import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.FirebirdServerPreparedStatement;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.FirebirdStatementResourceCleaner;
+import 
org.apache.shardingsphere.proxy.frontend.firebird.command.query.statement.fetch.FirebirdFetchStatementCache;
 import org.apache.shardingsphere.sql.parser.engine.api.CacheOption;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Answers;
@@ -77,6 +82,8 @@ import static org.mockito.Mockito.when;
 @MockitoSettings(strictness = Strictness.LENIENT)
 class FirebirdPrepareStatementCommandExecutorTest {
     
+    private static final int CONNECTION_ID = 1;
+    
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "Firebird");
     
     @Mock
@@ -85,12 +92,24 @@ class FirebirdPrepareStatementCommandExecutorTest {
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ConnectionSession connectionSession;
     
+    @Mock
+    private ProxyDatabaseConnectionManager connectionManager;
+    
+    @Mock
+    private ProxyBackendHandler proxyBackendHandler;
+    
+    @Mock
+    private ConnectionContext connectionContext;
+    
     @BeforeEach
     void setUp() {
-        ConnectionContext connectionContext = new 
ConnectionContext(Collections::emptySet, new Grantee("foo_user"));
+        
FirebirdFetchStatementCache.getInstance().registerConnection(CONNECTION_ID);
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new 
ServerPreparedStatementRegistry());
         when(connectionSession.getCurrentDatabaseName()).thenReturn("foo_db");
         
when(connectionSession.getConnectionContext()).thenReturn(connectionContext);
+        when(connectionContext.getGrantee()).thenReturn(new 
Grantee("foo_user"));
+        when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
+        
when(connectionSession.getDatabaseConnectionManager()).thenReturn(connectionManager);
         when(packet.getSQL()).thenReturn("SELECT 1");
         when(packet.getHintValueContext()).thenReturn(new HintValueContext());
         when(packet.isValidStatementHandle()).thenReturn(true);
@@ -100,6 +119,12 @@ class FirebirdPrepareStatementCommandExecutorTest {
         
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts()).thenReturn(createMetaDataContexts());
     }
     
+    @AfterEach
+    void tearDown() {
+        
FirebirdFetchStatementCache.getInstance().unregisterStatement(CONNECTION_ID, 1);
+        
FirebirdFetchStatementCache.getInstance().unregisterConnection(CONNECTION_ID);
+    }
+    
     private MetaDataContexts createMetaDataContexts() {
         SQLParserRule parserRule = new SQLParserRule(new 
SQLParserRuleConfiguration(new CacheOption(128, 1024L), new CacheOption(128, 
1024L)));
         RuleMetaData globalRuleMetaData = new 
RuleMetaData(Collections.singleton(parserRule));
@@ -115,7 +140,7 @@ class FirebirdPrepareStatementCommandExecutorTest {
     }
     
     @Test
-    void assertExecute() {
+    void assertExecute() throws Exception {
         FirebirdPrepareStatementCommandExecutor executor = new 
FirebirdPrepareStatementCommandExecutor(packet, connectionSession);
         Collection<DatabasePacket> actual = executor.execute();
         FirebirdGenericResponsePacket responsePacket = 
(FirebirdGenericResponsePacket) actual.iterator().next();
@@ -127,7 +152,7 @@ class FirebirdPrepareStatementCommandExecutorTest {
     }
     
     @Test
-    void assertDescribeCountReturnsBigintType() {
+    void assertDescribeCountReturnsBigintType() throws Exception {
         when(packet.getSQL()).thenReturn("SELECT COUNT(*) FROM foo_tbl");
         when(packet.nextItem()).thenReturn(true, true, true, true, true, 
false);
         when(packet.getCurrentItem()).thenReturn(
@@ -147,4 +172,22 @@ class FirebirdPrepareStatementCommandExecutorTest {
         columnPacket.write(payload);
         verify(payload).writeInt4LE(FirebirdBinaryColumnType.INT64.getValue() 
+ 1);
     }
+    
+    @Test
+    void 
assertExecuteWithValidStatementHandleCleansPreviousPreparedStatementResources() 
throws Exception {
+        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(1, 
new FirebirdServerPreparedStatement("SELECT 0", 
mock(SelectStatementContext.class), new HintValueContext()));
+        
FirebirdFetchStatementCache.getInstance().registerStatement(CONNECTION_ID, 1, 
proxyBackendHandler);
+        FirebirdPrepareStatementCommandExecutor executor = new 
FirebirdPrepareStatementCommandExecutor(packet, connectionSession);
+        executor.execute();
+        
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(1));
+        
assertThat(connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(1).getSql(),
 is("SELECT 1"));
+        
assertThat(FirebirdFetchStatementCache.getInstance().getFetchBackendHandler(CONNECTION_ID,
 1), is((ProxyBackendHandler) null));
+    }
+    
+    @Test
+    void assertExecuteWithValidStatementHandleWithoutFetchHandler() throws 
Exception {
+        FirebirdPrepareStatementCommandExecutor executor = new 
FirebirdPrepareStatementCommandExecutor(packet, connectionSession);
+        executor.execute();
+        
verify(connectionSession).invalidatePreparedStatementCache(FirebirdStatementResourceCleaner.createPreparedStatementCacheKey(1));
+    }
 }


Reply via email to