This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch opengauss_adapt
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/opengauss_adapt by this push:
     new 3d8021c  Support Portal for PostgreSQL Proxy (#10942)
3d8021c is described below

commit 3d8021c5bfcf38c598dcaa91f9d02014c7b2ebcc
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Jun 23 17:05:38 2021 +0800

    Support Portal for PostgreSQL Proxy (#10942)
    
    * Add OpenGauss Database type (#10292)
    
    * Update OpenGaussDataSourceMetaData
    
    * add jdbc:opengauss supoort (#10601)
    
    * Fix : remove @Override annotation since getDatabaseType() has been 
removed from Interface (#10836)
    
    * Fix : fix checkstyle violation (#10838)
    
    * Fix implements of OpenGaussParserFacade (#10849)
    
    * Implements openGauss batch bind protocol (#10850)
    
    * Implements openGauss BatchBind
    
    * Add javadoc
    
    * Add OpenGauss Database type (#10292)
    
    * Update OpenGaussDataSourceMetaData
    
    * add jdbc:opengauss supoort (#10601)
    
    * Fix : remove @Override annotation since getDatabaseType() has been 
removed from Interface (#10836)
    
    * Fix : fix checkstyle violation (#10838)
    
    * Fix implements of OpenGaussParserFacade (#10849)
    
    * Implements openGauss batch bind protocol (#10850)
    
    * Implements openGauss BatchBind
    
    * Add javadoc
    
    * Local openGauss JDBC driver
    
    * Support portal for PostgreSQL Proxy
    
    * Support Close Portal for PostgreSQL
    
    * Enhance PostgreSQLComExecuteExecutor
    
    * Move cached statements and result sets into DatabaseCommunicationEngine
    
    * Complete tests for DatabaseCommunicationEngine
    
    * Close TextProtocolBackendHandler correctly
    
    * Complete Portal implementation
    
    * Adapt openGauss batch bind
    
    * Revert "Local openGauss JDBC driver"
    
    This reverts commit 5a1828fb
    
    Co-authored-by: Liang Zhang <[email protected]>
    Co-authored-by: zhangliang <[email protected]>
    Co-authored-by: justbk2015 <[email protected]>
    Co-authored-by: 孙念君 Sun Nianjun <[email protected]>
---
 .../binary/bind/OpenGaussComBatchBindPacket.java   |  28 +----
 .../query/binary/bind/PostgreSQLComBindPacket.java |   4 +-
 .../binary/execute/PostgreSQLComExecutePacket.java |  10 +-
 ...t.java => PostgreSQLPortalSuspendedPacket.java} |  16 +--
 .../communication/DatabaseCommunicationEngine.java |  75 +++++++++++-
 .../backend/communication/ProxySQLExecutor.java    |   9 +-
 .../jdbc/connection/BackendConnection.java         |  50 ++------
 .../jdbc/executor/ProxyJDBCExecutor.java           |   7 +-
 .../callback/ProxyJDBCExecutorCallback.java        |  12 +-
 .../callback/ProxyJDBCExecutorCallbackFactory.java |  15 +--
 .../ProxyPreparedStatementExecutorCallback.java    |   6 +-
 .../impl/ProxyStatementExecutorCallback.java       |   6 +-
 .../backend/text/TextProtocolBackendHandler.java   |   8 ++
 .../impl/SchemaAssignedDatabaseBackendHandler.java |   5 +
 .../data/impl/UnicastDatabaseBackendHandler.java   |   5 +
 .../DatabaseCommunicationEngineTest.java           |  99 ++++++++++++++--
 .../jdbc/connection/BackendConnectionTest.java     |  73 ------------
 .../frontend/command/CommandExecutorTask.java      |  20 ++--
 .../netty/FrontendChannelInboundHandler.java       |   3 +-
 .../frontend/command/CommandExecutorTaskTest.java  |  11 +-
 .../execute/MySQLComStmtExecuteExecutor.java       |   7 +-
 .../fieldlist/MySQLComFieldListPacketExecutor.java |   7 +-
 .../text/query/MySQLComQueryPacketExecutor.java    |   5 +
 .../command/PostgreSQLCommandExecutorFactory.java  |   7 +-
 .../command/PostgreSQLConnectionContext.java       |  87 +++++++++++++-
 .../command/query/binary/PostgreSQLPortal.java     | 129 +++++++++++++++++++++
 .../binary/bind/OpenGaussComBatchBindExecutor.java |  11 +-
 .../binary/bind/PostgreSQLComBindExecutor.java     | 105 ++---------------
 .../binary/close/PostgreSQLComCloseExecutor.java   |  22 ++--
 .../execute/PostgreSQLComExecuteExecutor.java      |  75 ++++++------
 .../query/text/PostgreSQLComQueryExecutor.java     |  22 +++-
 .../PostgreSQLCommandExecuteEngineTest.java        |  10 --
 .../binary/bind/PostgreSQLComBindExecutorTest.java |  65 -----------
 .../close/PostgreSQLComCloseExecutorTest.java      |  15 ++-
 .../execute/PostgreSQLComExecuteExecutorTest.java  |  52 +++++----
 .../query/text/PostgreSQLComQueryExecutorTest.java |   6 +-
 .../frontend/command/executor/CommandExecutor.java |   8 ++
 37 files changed, 633 insertions(+), 462 deletions(-)

diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
index 8c290e9..9b767b1 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.bi
 import com.google.common.collect.Lists;
 import lombok.Getter;
 import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat;
+import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement;
@@ -47,11 +47,11 @@ public final class OpenGaussComBatchBindPacket extends 
PostgreSQLCommandPacket {
     private final String statementId;
     
     private final String sql;
-    
-    private final List<Integer> resultFormatCodes;
-    
+
     private final List<List<Object>> parameters;
     
+    private final List<PostgreSQLValueFormat> resultFormats;
+
     public OpenGaussComBatchBindPacket(final PostgreSQLPacketPayload payload, 
final int connectionId) {
         payload.readInt4();
         payload.readInt4();
@@ -63,9 +63,9 @@ public final class OpenGaussComBatchBindPacket extends 
PostgreSQLCommandPacket {
             parameterFormats.add(payload.readInt2());
         }
         int resultFormatsLength = payload.readInt2();
-        resultFormatCodes = new ArrayList<>(resultFormatsLength);
+        resultFormats = new ArrayList<>(resultFormatsLength);
         for (int i = 0; i < resultFormatsLength; i++) {
-            resultFormatCodes.add(payload.readInt2());
+            
resultFormats.add(PostgreSQLValueFormat.valueOf(payload.readInt2()));
         }
         PostgreSQLBinaryStatement binaryStatement = 
PostgreSQLBinaryStatementRegistry.getInstance().get(connectionId).getBinaryStatement(statementId);
         sql = null == binaryStatement ? null : binaryStatement.getSql();
@@ -152,22 +152,6 @@ public final class OpenGaussComBatchBindPacket extends 
PostgreSQLCommandPacket {
         return binaryProtocolValue.read(payload, parameterValueLength);
     }
     
-    /**
-     * Get result format by column index.
-     *
-     * @param columnIndex column index
-     * @return result format
-     */
-    public PostgreSQLColumnFormat getResultFormatByColumnIndex(final int 
columnIndex) {
-        if (resultFormatCodes.isEmpty()) {
-            return PostgreSQLColumnFormat.TEXT;
-        }
-        if (1 == resultFormatCodes.size()) {
-            return PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(0));
-        }
-        return 
PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(columnIndex));
-    }
-    
     @Override
     public void write(final PostgreSQLPacketPayload payload) {
     }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
index 1ed09d3..7955d3b 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/PostgreSQLComBindPacket.java
@@ -43,6 +43,8 @@ import java.util.List;
 @Getter
 public final class PostgreSQLComBindPacket extends PostgreSQLCommandPacket {
     
+    private final String portal;
+    
     private final String statementId;
     
     private final String sql;
@@ -53,7 +55,7 @@ public final class PostgreSQLComBindPacket extends 
PostgreSQLCommandPacket {
     
     public PostgreSQLComBindPacket(final PostgreSQLPacketPayload payload, 
final int connectionId) {
         payload.readInt4();
-        payload.readStringNul();
+        portal = payload.readStringNul();
         statementId = payload.readStringNul();
         int parameterFormatCount = payload.readInt2();
         List<Integer> parameterFormats = new ArrayList<>(parameterFormatCount);
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
index c4469c2..f7b499a 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute;
 
+import lombok.Getter;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
@@ -25,12 +26,17 @@ import 
org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacket
 /**
  * Command execute packet for PostgreSQL.
  */
+@Getter
 public final class PostgreSQLComExecutePacket extends PostgreSQLCommandPacket {
     
+    private final String portal;
+    
+    private final int maxRows;
+    
     public PostgreSQLComExecutePacket(final PostgreSQLPacketPayload payload) {
         payload.readInt4();
-        payload.readStringNul();
-        payload.readInt4();
+        portal = payload.readStringNul();
+        maxRows = payload.readInt4();
     }
     
     @Override
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLPortalSuspendedPacket.java
similarity index 69%
copy from 
shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
copy to 
shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLPortalSuspendedPacket.java
index c4469c2..abc8647 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLComExecutePacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/execute/PostgreSQLPortalSuspendedPacket.java
@@ -17,21 +17,15 @@
 
 package 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute;
 
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
 import 
org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
 
 /**
- * Command execute packet for PostgreSQL.
+ * Portal suspended packet for PostgreSQL.
  */
-public final class PostgreSQLComExecutePacket extends PostgreSQLCommandPacket {
-    
-    public PostgreSQLComExecutePacket(final PostgreSQLPacketPayload payload) {
-        payload.readInt4();
-        payload.readStringNul();
-        payload.readInt4();
-    }
+public final class PostgreSQLPortalSuspendedPacket implements 
PostgreSQLIdentifierPacket {
     
     @Override
     public void write(final PostgreSQLPacketPayload payload) {
@@ -39,6 +33,6 @@ public final class PostgreSQLComExecutePacket extends 
PostgreSQLCommandPacket {
     
     @Override
     public PostgreSQLIdentifierTag getIdentifier() {
-        return PostgreSQLCommandPacketType.EXECUTE_COMMAND;
+        return PostgreSQLMessagePacketType.PORTAL_SUSPENDED;
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 2f1a058..b770f31 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -44,11 +44,15 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryH
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /**
@@ -73,18 +77,40 @@ public final class DatabaseCommunicationEngine {
     
     private ProxyLockEngine proxyLockEngine;
     
+    private final Collection<Statement> cachedStatements = new 
CopyOnWriteArrayList<>();
+    
+    private final Collection<ResultSet> cachedResultSets = new 
CopyOnWriteArrayList<>();
+    
     public DatabaseCommunicationEngine(final String driverType, final 
ShardingSphereMetaData metaData, final LogicSQL logicSQL, final 
BackendConnection backendConnection) {
         this.driverType = driverType;
         this.metaData = metaData;
         this.logicSQL = logicSQL;
-        proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection);
+        proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, 
this);
         kernelProcessor = new KernelProcessor();
-        proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new 
MetadataRefreshEngine(metaData, 
-                
ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
 
+        proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new 
MetadataRefreshEngine(metaData,
+                
ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
                 ProxyContext.getInstance().getMetaDataContexts().getProps(), 
ProxyContext.getInstance().getLock().orElse(null)), 
backendConnection.getSchemaName());
     }
     
     /**
+     * Add statement.
+     *
+     * @param statement statement to be added
+     */
+    public void add(final Statement statement) {
+        cachedStatements.add(statement);
+    }
+    
+    /**
+     * Add result set.
+     *
+     * @param resultSet result set to be added
+     */
+    public void add(final ResultSet resultSet) {
+        cachedResultSets.add(resultSet);
+    }
+    
+    /**
      * Execute to database.
      *
      * @return backend response
@@ -192,4 +218,47 @@ public final class DatabaseCommunicationEngine {
     private boolean isBinary() {
         return JDBCDriverType.PREPARED_STATEMENT.equals(driverType);
     }
+    
+    /**
+     * Close database communication engine.
+     *
+     * @throws SQLException SQL exception
+     */
+    public void close() throws SQLException {
+        Collection<SQLException> result = new LinkedList<>();
+        result.addAll(closeResultSets());
+        result.addAll(closeStatements());
+        if (result.isEmpty()) {
+            return;
+        }
+        SQLException ex = new SQLException();
+        result.forEach(ex::setNextException);
+        throw ex;
+    }
+    
+    private Collection<SQLException> closeResultSets() {
+        Collection<SQLException> result = new LinkedList<>();
+        for (ResultSet each : cachedResultSets) {
+            try {
+                each.close();
+            } catch (final SQLException ex) {
+                result.add(ex);
+            }
+        }
+        cachedResultSets.clear();
+        return result;
+    }
+    
+    private Collection<SQLException> closeStatements() {
+        Collection<SQLException> result = new LinkedList<>();
+        for (Statement each : cachedStatements) {
+            try {
+                each.close();
+            } catch (final SQLException ex) {
+                result.add(ex);
+            }
+        }
+        cachedStatements.clear();
+        return result;
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index fba5049..0df7432 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -67,18 +67,21 @@ public final class ProxySQLExecutor {
     
     private final BackendConnection backendConnection;
     
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
+    
     private final ProxyJDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
     private final FederateExecutor federateExecutor;
     
-    public ProxySQLExecutor(final String type, final BackendConnection 
backendConnection) {
+    public ProxySQLExecutor(final String type, final BackendConnection 
backendConnection, final DatabaseCommunicationEngine 
databaseCommunicationEngine) {
         this.type = type;
         this.backendConnection = backendConnection;
+        this.databaseCommunicationEngine = databaseCommunicationEngine;
         ExecutorEngine executorEngine = 
BackendExecutorContext.getInstance().getExecutorEngine();
         boolean isSerialExecute = backendConnection.isSerialExecute();
-        jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new 
JDBCExecutor(executorEngine, isSerialExecute));
+        jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, 
databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getMetaDataContexts();
         rawExecutor = new RawExecutor(executorEngine, isSerialExecute, 
metaDataContexts.getProps());
         // TODO Consider FederateRawExecutor
@@ -145,7 +148,7 @@ public final class ProxySQLExecutor {
         }
         MetaDataContexts metaData = 
ProxyContext.getInstance().getMetaDataContexts();
         ProxyJDBCExecutorCallback callback = 
ProxyJDBCExecutorCallbackFactory.newInstance(type, 
metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
 
-                executionContext.getSqlStatementContext().getSqlStatement(), 
backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
+                executionContext.getSqlStatementContext().getSqlStatement(), 
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true);
         backendConnection.setFederateExecutor(federateExecutor);
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
metaData);
         return federateExecutor.executeQuery(executionContext, callback, 
prepareEngine).stream().map(each -> (ExecuteResult) 
each).collect(Collectors.toList());
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 1bcffb8..0beea56 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -39,7 +40,6 @@ import 
org.apache.shardingsphere.transaction.core.TransactionType;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
@@ -74,9 +74,7 @@ public final class BackendConnection implements 
ExecutorJDBCManager {
     
     private final Multimap<String, Connection> cachedConnections = 
LinkedHashMultimap.create();
     
-    private final Collection<Statement> cachedStatements = new 
CopyOnWriteArrayList<>();
-    
-    private final Collection<ResultSet> cachedResultSets = new 
CopyOnWriteArrayList<>();
+    private final Collection<DatabaseCommunicationEngine> 
cachedDatabaseCommunicationEngines = new CopyOnWriteArrayList<>();
     
     private final Collection<ConnectionPostProcessor> connectionPostProcessors 
= new LinkedList<>();
     
@@ -213,56 +211,28 @@ public final class BackendConnection implements 
ExecutorJDBCManager {
     }
     
     /**
-     * Add statement.
-     *
-     * @param statement statement to be added
-     */
-    public void add(final Statement statement) {
-        cachedStatements.add(statement);
-    }
-    
-    /**
-     * Add result set.
+     * Add database communication engine.
      *
-     * @param resultSet result set to be added
+     * @param databaseCommunicationEngine database communication engine to be 
added
      */
-    public void add(final ResultSet resultSet) {
-        cachedResultSets.add(resultSet);
-    }
-    
-    /**
-     * Close result sets.
-     *
-     * @return SQL exception when result sets close
-     */
-    public synchronized Collection<SQLException> closeResultSets() {
-        Collection<SQLException> result = new LinkedList<>();
-        for (ResultSet each : cachedResultSets) {
-            try {
-                each.close();
-            } catch (final SQLException ex) {
-                result.add(ex);
-            }
-        }
-        cachedResultSets.clear();
-        return result;
+    public void add(final DatabaseCommunicationEngine 
databaseCommunicationEngine) {
+        cachedDatabaseCommunicationEngines.add(databaseCommunicationEngine);
     }
     
     /**
-     * Close statements.
+     * Close database communication engines.
      *
-     * @return SQL exception when statements close
+     * @return SQL exception when engine close
      */
-    public synchronized Collection<SQLException> closeStatements() {
+    public synchronized Collection<SQLException> 
closeDatabaseCommunicationEngines() {
         Collection<SQLException> result = new LinkedList<>();
-        for (Statement each : cachedStatements) {
+        for (DatabaseCommunicationEngine each : 
cachedDatabaseCommunicationEngines) {
             try {
                 each.close();
             } catch (final SQLException ex) {
                 result.add(ex);
             }
         }
-        cachedStatements.clear();
         return result;
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 4087d10..d6e12c6 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -44,6 +45,8 @@ public final class ProxyJDBCExecutor {
     
     private final BackendConnection backendConnection;
     
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
+    
     @Getter
     private final JDBCExecutor jdbcExecutor;
     
@@ -64,8 +67,8 @@ public final class ProxyJDBCExecutor {
             DatabaseType databaseType = 
metaDataContexts.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType();
             ExecuteProcessEngine.initialize(context, executionGroupContext, 
metaDataContexts.getProps());
             Collection<ExecuteResult> result = 
jdbcExecutor.execute(executionGroupContext,
-                    ProxyJDBCExecutorCallbackFactory.newInstance(type, 
databaseType, context.getSqlStatement(), backendConnection, 
isReturnGeneratedKeys, isExceptionThrown, true),
-                    ProxyJDBCExecutorCallbackFactory.newInstance(type, 
databaseType, context.getSqlStatement(), backendConnection, 
isReturnGeneratedKeys, isExceptionThrown, false));
+                    ProxyJDBCExecutorCallbackFactory.newInstance(type, 
databaseType, context.getSqlStatement(), databaseCommunicationEngine, 
isReturnGeneratedKeys, isExceptionThrown, true),
+                    ProxyJDBCExecutorCallbackFactory.newInstance(type, 
databaseType, context.getSqlStatement(), databaseCommunicationEngine, 
isReturnGeneratedKeys, isExceptionThrown, false));
             
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
             return result;
         } finally {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
index 2d05d4f..3285833 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
@@ -28,7 +28,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
@@ -43,7 +43,7 @@ import java.util.Optional;
  */
 public abstract class ProxyJDBCExecutorCallback extends 
JDBCExecutorCallback<ExecuteResult> {
     
-    private final BackendConnection backendConnection;
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
     
     private final boolean isReturnGeneratedKeys;
     
@@ -51,10 +51,10 @@ public abstract class ProxyJDBCExecutorCallback extends 
JDBCExecutorCallback<Exe
     
     private boolean hasMetaData;
     
-    public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final 
SQLStatement sqlStatement, final BackendConnection backendConnection,
+    public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final 
SQLStatement sqlStatement, final DatabaseCommunicationEngine 
databaseCommunicationEngine,
                                      final boolean isReturnGeneratedKeys, 
final boolean isExceptionThrown, final boolean fetchMetaData) {
         super(databaseType, sqlStatement, isExceptionThrown);
-        this.backendConnection = backendConnection;
+        this.databaseCommunicationEngine = databaseCommunicationEngine;
         this.isReturnGeneratedKeys = isReturnGeneratedKeys;
         this.fetchMetaData = fetchMetaData;
     }
@@ -69,10 +69,10 @@ public abstract class ProxyJDBCExecutorCallback extends 
JDBCExecutorCallback<Exe
     }
     
     private ExecuteResult executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode, final boolean withMetadata) 
throws SQLException {
-        backendConnection.add(statement);
+        databaseCommunicationEngine.add(statement);
         if (execute(sql, statement, isReturnGeneratedKeys)) {
             ResultSet resultSet = statement.getResultSet();
-            backendConnection.add(resultSet);
+            databaseCommunicationEngine.add(resultSet);
             return createQueryResult(resultSet, connectionMode);
         }
         return new UpdateResult(statement.getUpdateCount(), 
isReturnGeneratedKeys ? getGeneratedKey(statement) : 0L);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
index 3967393..f7edf40 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyPreparedStatementExecutorCallback;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyStatementExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -34,23 +34,24 @@ public final class ProxyJDBCExecutorCallbackFactory {
     
     /**
      * Create new instance of Proxy JDBC executor callback.
-     * 
+     *
      * @param type driver type
      * @param databaseType database type
      * @param sqlStatement SQL statement
-     * @param backendConnection backend connection
+     * @param databaseCommunicationEngine database communication engine
      * @param isReturnGeneratedKeys is return generated keys or not
      * @param isExceptionThrown is exception thrown or not
      * @param isFetchMetaData is fetch meta data or not
      * @return instance of Proxy JDBC executor callback
      */
-    public static ProxyJDBCExecutorCallback newInstance(final String type, 
final DatabaseType databaseType, final SQLStatement sqlStatement, final 
BackendConnection backendConnection,
-                                                        final boolean 
isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean 
isFetchMetaData) {
+    public static ProxyJDBCExecutorCallback newInstance(final String type, 
final DatabaseType databaseType, final SQLStatement sqlStatement,
+                                                        final 
DatabaseCommunicationEngine databaseCommunicationEngine, final boolean 
isReturnGeneratedKeys, final boolean isExceptionThrown,
+                                                        final boolean 
isFetchMetaData) {
         if (JDBCDriverType.STATEMENT.equals(type)) {
-            return new ProxyStatementExecutorCallback(databaseType, 
sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, 
isFetchMetaData);
+            return new ProxyStatementExecutorCallback(databaseType, 
sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, 
isExceptionThrown, isFetchMetaData);
         }
         if (JDBCDriverType.PREPARED_STATEMENT.equals(type)) {
-            return new ProxyPreparedStatementExecutorCallback(databaseType, 
sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, 
isFetchMetaData);
+            return new ProxyPreparedStatementExecutorCallback(databaseType, 
sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, 
isExceptionThrown, isFetchMetaData);
         }
         throw new UnsupportedOperationException(String.format("Unsupported 
driver type: `%s`", type));
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
index 67bcf1c..f8b4a60 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
 
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
@@ -31,9 +31,9 @@ import java.sql.Statement;
  */
 public final class ProxyPreparedStatementExecutorCallback extends 
ProxyJDBCExecutorCallback {
     
-    public ProxyPreparedStatementExecutorCallback(final DatabaseType 
databaseType, final SQLStatement sqlStatement, final BackendConnection 
backendConnection,
+    public ProxyPreparedStatementExecutorCallback(final DatabaseType 
databaseType, final SQLStatement sqlStatement, final 
DatabaseCommunicationEngine databaseCommunicationEngine,
                                                   final boolean 
isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean 
fetchMetaData) {
-        super(databaseType, sqlStatement, backendConnection, 
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+        super(databaseType, sqlStatement, databaseCommunicationEngine, 
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
index 88647fc..17c2044 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
 
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
@@ -30,9 +30,9 @@ import java.sql.Statement;
  */
 public final class ProxyStatementExecutorCallback extends 
ProxyJDBCExecutorCallback {
     
-    public ProxyStatementExecutorCallback(final DatabaseType databaseType, 
final SQLStatement sqlStatement, final BackendConnection backendConnection,
+    public ProxyStatementExecutorCallback(final DatabaseType databaseType, 
final SQLStatement sqlStatement, final DatabaseCommunicationEngine 
databaseCommunicationEngine,
                                           final boolean isReturnGeneratedKeys, 
final boolean isExceptionThrown, final boolean fetchMetaData) {
-        super(databaseType, sqlStatement, backendConnection, 
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+        super(databaseType, sqlStatement, databaseCommunicationEngine, 
isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
index f87cb1d..5c4e12f 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
@@ -55,4 +55,12 @@ public interface TextProtocolBackendHandler {
     default Collection<Object> getRowData() throws SQLException {
         return Collections.emptyList();
     }
+    
+    /**
+     * Close handler.
+     *
+     * @throws SQLException SQL exception
+     */
+    default void close() throws SQLException {
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index f2633a8..8d8ad45 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -64,4 +64,9 @@ public final class SchemaAssignedDatabaseBackendHandler 
implements DatabaseBacke
     public Collection<Object> getRowData() throws SQLException {
         return databaseCommunicationEngine.getQueryResponseRow().getData();
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
index 57ec70f..3871ad1 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
@@ -74,4 +74,9 @@ public final class UnicastDatabaseBackendHandler implements 
DatabaseBackendHandl
     public Collection<Object> getRowData() throws SQLException {
         return databaseCommunicationEngine.getQueryResponseRow().getData();
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
index 5677758..e2768a4 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.proxy.backend.communication;
 
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
 import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataContexts;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
@@ -44,11 +44,17 @@ import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLSta
 import 
org.apache.shardingsphere.transaction.context.impl.StandardTransactionContexts;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.Field;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -56,18 +62,33 @@ import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public final class DatabaseCommunicationEngineTest {
     
+    @Mock
+    private BackendConnection backendConnection;
+    
+    @Mock
+    private Statement statement;
+    
+    @Mock
+    private ResultSet resultSet;
+    
     @Before
     public void setUp() {
-        MetaDataContexts metaDataContexts = new 
StandardMetaDataContexts(mockMetaDataMap(), 
mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class), 
+        when(backendConnection.getSchemaName()).thenReturn("schema");
+        MetaDataContexts metaDataContexts = new 
StandardMetaDataContexts(mockMetaDataMap(), 
mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
                 new ConfigurationProperties(new Properties()));
         ProxyContext.getInstance().init(metaDataContexts, new 
StandardTransactionContexts());
     }
@@ -81,8 +102,6 @@ public final class DatabaseCommunicationEngineTest {
     
     @Test
     public void assertBinaryProtocolQueryHeader() throws SQLException, 
NoSuchFieldException {
-        BackendConnection backendConnection = mock(BackendConnection.class);
-        when(backendConnection.getSchemaName()).thenReturn("schema");
         DatabaseCommunicationEngine engine =
                 
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
 "schemaName", Collections.emptyList(), backendConnection);
         assertNotNull(engine);
@@ -95,7 +114,7 @@ public final class DatabaseCommunicationEngineTest {
             private MemoryQueryResultRow memoryQueryResultRow;
             
             @Override
-            protected List<MemoryQueryResultRow> init(final ShardingSphereRule 
rule, final ShardingSphereSchema schema, 
+            protected List<MemoryQueryResultRow> init(final ShardingSphereRule 
rule, final ShardingSphereSchema schema,
                                                       final 
SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) {
                 memoryQueryResultRow = mock(MemoryQueryResultRow.class);
                 return Collections.singletonList(memoryQueryResultRow);
@@ -117,7 +136,6 @@ public final class DatabaseCommunicationEngineTest {
         ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
         when(schema.get("t_logic_order")).thenReturn(new 
TableMetaData(Collections.singletonList(columnMetaData), 
Collections.singletonList(new IndexMetaData("order_id"))));
         DataSourcesMetaData dataSourcesMetaData = 
mock(DataSourcesMetaData.class);
-        
when(dataSourcesMetaData.getDataSourceMetaData("ds_0")).thenReturn(mock(DataSourceMetaData.class));
         
when(result.getResource().getDataSourcesMetaData()).thenReturn(dataSourcesMetaData);
         when(result.getSchema()).thenReturn(schema);
         ShardingRule shardingRule = mock(ShardingRule.class);
@@ -132,7 +150,6 @@ public final class DatabaseCommunicationEngineTest {
         when(result.getTableName(1)).thenReturn("t_order");
         when(result.getColumnLabel(1)).thenReturn("order_id");
         when(result.getColumnName(1)).thenReturn("order_id");
-        when(result.getColumnName(2)).thenReturn("expr");
         when(result.getColumnType(1)).thenReturn(Types.INTEGER);
         when(result.isSigned(1)).thenReturn(true);
         when(result.isAutoIncrement(1)).thenReturn(true);
@@ -141,4 +158,72 @@ public final class DatabaseCommunicationEngineTest {
         when(result.isNotNull(1)).thenReturn(true);
         return result;
     }
+    
+    @Test
+    public void assertAddStatementCorrectly() {
+        DatabaseCommunicationEngine engine =
+                
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
 "schemaName", Collections.emptyList(), backendConnection);
+        engine.add(statement);
+        Collection<?> actual = getField(engine, "cachedStatements");
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), is(statement));
+    }
+    
+    @Test
+    public void assertAddResultSetCorrectly() {
+        DatabaseCommunicationEngine engine =
+                
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
 "schemaName", Collections.emptyList(), backendConnection);
+        engine.add(resultSet);
+        Collection<?> actual = getField(engine, "cachedResultSets");
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), is(resultSet));
+    }
+    
+    @Test
+    public void assertCloseCorrectly() throws SQLException {
+        DatabaseCommunicationEngine engine =
+                
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
 "schemaName", Collections.emptyList(), backendConnection);
+        Collection<ResultSet> cachedResultSets = getField(engine, 
"cachedResultSets");
+        cachedResultSets.add(resultSet);
+        Collection<Statement> cachedStatements = getField(engine, 
"cachedStatements");
+        cachedStatements.add(statement);
+        engine.close();
+        verify(resultSet).close();
+        verify(statement).close();
+        assertTrue(cachedResultSets.isEmpty());
+        assertTrue(cachedStatements.isEmpty());
+    }
+    
+    @Test
+    public void assertCloseResultSetsWithExceptionThrown() throws SQLException 
{
+        DatabaseCommunicationEngine engine =
+                
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class),
 "schemaName", Collections.emptyList(), backendConnection);
+        Collection<ResultSet> cachedResultSets = getField(engine, 
"cachedResultSets");
+        SQLException sqlExceptionByResultSet = new SQLException("ResultSet");
+        doThrow(sqlExceptionByResultSet).when(resultSet).close();
+        cachedResultSets.add(resultSet);
+        Collection<Statement> cachedStatements = getField(engine, 
"cachedStatements");
+        SQLException sqlExceptionByStatement = new SQLException("Statement");
+        doThrow(sqlExceptionByStatement).when(statement).close();
+        cachedStatements.add(statement);
+        SQLException actual = null;
+        try {
+            engine.close();
+        } catch (final SQLException ex) {
+            actual = ex;
+        }
+        verify(resultSet).close();
+        verify(statement).close();
+        assertTrue(cachedResultSets.isEmpty());
+        assertTrue(cachedStatements.isEmpty());
+        assertThat(actual.getNextException(), is(sqlExceptionByResultSet));
+        assertThat(actual.getNextException().getNextException(), 
is(sqlExceptionByStatement));
+    }
+    
+    @SneakyThrows
+    private <T> T getField(final DatabaseCommunicationEngine target, final 
String fieldName) {
+        Field field = 
DatabaseCommunicationEngine.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (T) field.get(target);
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index b7498eb..b9e3ea8 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -44,7 +44,6 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
@@ -260,78 +259,6 @@ public final class BackendConnectionTest {
     }
     
     @Test
-    public void assertAddStatementCorrectly() throws NoSuchFieldException, 
IllegalAccessException {
-        Statement statement = mock(Statement.class);
-        backendConnection.add(statement);
-        Field field = 
backendConnection.getClass().getDeclaredField("cachedStatements");
-        field.setAccessible(true);
-        assertTrue(((Collection<?>) 
field.get(backendConnection)).contains(statement));
-    }
-    
-    @Test
-    public void assertAddResultSetCorrectly() throws NoSuchFieldException, 
IllegalAccessException {
-        ResultSet resultSet = mock(ResultSet.class);
-        backendConnection.add(resultSet);
-        Field field = 
backendConnection.getClass().getDeclaredField("cachedResultSets");
-        field.setAccessible(true);
-        assertTrue(((Collection<?>) 
field.get(backendConnection)).contains(resultSet));
-    }
-    
-    @Test
-    public void assertCloseResultSetsCorrectly() throws NoSuchFieldException, 
SQLException, IllegalAccessException {
-        Field field = 
backendConnection.getClass().getDeclaredField("cachedResultSets");
-        field.setAccessible(true);
-        Collection<ResultSet> cachedResultSets = (Collection<ResultSet>) 
field.get(backendConnection);
-        ResultSet resultSet = mock(ResultSet.class);
-        cachedResultSets.add(resultSet);
-        backendConnection.closeResultSets();
-        verify(resultSet, times(1)).close();
-        assertTrue(cachedResultSets.isEmpty());
-    }
-    
-    @Test
-    public void assertCloseResultSetsWithExceptionThrown() throws 
NoSuchFieldException, SQLException, IllegalAccessException {
-        Field field = 
backendConnection.getClass().getDeclaredField("cachedResultSets");
-        field.setAccessible(true);
-        Collection<ResultSet> cachedResultSets = (Collection<ResultSet>) 
field.get(backendConnection);
-        ResultSet resultSet = mock(ResultSet.class);
-        SQLException sqlException = new SQLException("");
-        doThrow(sqlException).when(resultSet).close();
-        cachedResultSets.add(resultSet);
-        Collection<SQLException> result = backendConnection.closeResultSets();
-        verify(resultSet, times(1)).close();
-        assertTrue(cachedResultSets.isEmpty());
-        assertTrue(result.contains(sqlException));
-    }
-    
-    @Test
-    public void assertCloseStatementsCorrectly() throws NoSuchFieldException, 
SQLException, IllegalAccessException {
-        Field field = 
backendConnection.getClass().getDeclaredField("cachedStatements");
-        field.setAccessible(true);
-        Collection<Statement> cachedStatement = (Collection<Statement>) 
field.get(backendConnection);
-        Statement statement = mock(Statement.class);
-        cachedStatement.add(statement);
-        backendConnection.closeStatements();
-        verify(statement, times(1)).close();
-        assertTrue(cachedStatement.isEmpty());
-    }
-    
-    @Test
-    public void assertCloseStatementsWithExceptionThrown() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
-        Field field = 
backendConnection.getClass().getDeclaredField("cachedStatements");
-        field.setAccessible(true);
-        Collection<Statement> cachedStatement = (Collection<Statement>) 
field.get(backendConnection);
-        Statement statement = mock(Statement.class);
-        cachedStatement.add(statement);
-        SQLException sqlException = new SQLException("");
-        doThrow(sqlException).when(statement).close();
-        Collection<SQLException> result = backendConnection.closeStatements();
-        verify(statement, times(1)).close();
-        assertTrue(cachedStatement.isEmpty());
-        assertTrue(result.contains(sqlException));
-    }
-    
-    @Test
     public void assertCloseConnectionsCorrectlyWhenNotForceRollback() throws 
NoSuchFieldException, IllegalAccessException, SQLException {
         Field field = 
backendConnection.getClass().getDeclaredField("cachedConnections");
         field.setAccessible(true);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index e92d22f..d55dabd 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -90,13 +90,17 @@ public final class CommandExecutorTask implements Runnable {
         CommandPacketType type = 
commandExecuteEngine.getCommandPacketType(payload);
         CommandPacket commandPacket = 
commandExecuteEngine.getCommandPacket(payload, type, backendConnection);
         CommandExecutor commandExecutor = 
commandExecuteEngine.getCommandExecutor(type, commandPacket, backendConnection);
-        Collection<DatabasePacket<?>> responsePackets = 
commandExecutor.execute();
-        if (responsePackets.isEmpty()) {
-            return false;
-        }
-        responsePackets.forEach(context::write);
-        if (commandExecutor instanceof QueryCommandExecutor) {
-            return commandExecuteEngine.writeQueryData(context, 
backendConnection, (QueryCommandExecutor) commandExecutor, 
responsePackets.size());
+        try {
+            Collection<DatabasePacket<?>> responsePackets = 
commandExecutor.execute();
+            if (responsePackets.isEmpty()) {
+                return false;
+            }
+            responsePackets.forEach(context::write);
+            if (commandExecutor instanceof QueryCommandExecutor) {
+                return commandExecuteEngine.writeQueryData(context, 
backendConnection, (QueryCommandExecutor) commandExecutor, 
responsePackets.size());
+            }
+        } finally {
+            commandExecutor.close();
         }
         return 
databaseProtocolFrontendEngine.getFrontendContext().isFlushForPerCommandPacket();
     }
@@ -113,8 +117,6 @@ public final class CommandExecutorTask implements Runnable {
     private Collection<SQLException> closeExecutionResources() {
         Collection<SQLException> result = new LinkedList<>();
         PrimaryVisitedManager.clear();
-        result.addAll(backendConnection.closeResultSets());
-        result.addAll(backendConnection.closeStatements());
         result.addAll(backendConnection.closeFederateExecutor());
         return result;
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d4d2984..cd6c297 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -94,8 +94,7 @@ public final class FrontendChannelInboundHandler extends 
ChannelInboundHandlerAd
     private void closeAllResources() {
         
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
         PrimaryVisitedManager.clear();
-        backendConnection.closeResultSets();
-        backendConnection.closeStatements();
+        backendConnection.closeDatabaseCommunicationEngines();
         backendConnection.closeConnections(true);
         backendConnection.closeFederateExecutor();
         databaseProtocolFrontendEngine.release(backendConnection);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 822d715..b9422df 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -102,14 +102,13 @@ public final class CommandExecutorTaskTest {
         
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
-        
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
         verify(connectionStatus).switchToUsing();
+        verify(queryCommandExecutor).close();
     }
     
     @Test
@@ -123,8 +122,6 @@ public final class CommandExecutorTaskTest {
         
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
-        
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
@@ -134,6 +131,7 @@ public final class CommandExecutorTaskTest {
         verify(handlerContext).write(databasePacket);
         verify(handlerContext).flush();
         verify(executeEngine).writeQueryData(handlerContext, 
backendConnection, queryCommandExecutor, 1);
+        verify(queryCommandExecutor).close();
     }
     
     @Test
@@ -148,8 +146,6 @@ public final class CommandExecutorTaskTest {
         
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
-        
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
@@ -158,6 +154,7 @@ public final class CommandExecutorTaskTest {
         verify(connectionStatus).switchToUsing();
         verify(handlerContext).write(databasePacket);
         verify(handlerContext).flush();
+        verify(commandExecutor).close();
     }
     
     @Test
@@ -169,8 +166,6 @@ public final class CommandExecutorTaskTest {
         when(executeEngine.getErrorPacket(mockException, 
backendConnection)).thenReturn(databasePacket);
         
when(executeEngine.getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
         when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
-        
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index a6569cd..afcde18 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,8 +25,8 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
@@ -111,4 +111,9 @@ public final class MySQLComStmtExecuteExecutor implements 
QueryCommandExecutor {
         return new BinaryRow(queryResponseRow.getCells().stream().map(
             each -> new 
BinaryCell(MySQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell) 
each).getJdbcType()), each.getData())).collect(Collectors.toList()));
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
index 3e2b1bb..9f7e811 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
@@ -23,12 +23,12 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fie
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.SQLException;
@@ -76,4 +76,9 @@ public final class MySQLComFieldListPacketExecutor implements 
CommandExecutor {
         result.add(new MySQLEofPacket(++currentSequenceId));
         return result;
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
index 9984604..9d06568 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
@@ -79,4 +79,9 @@ public final class MySQLComQueryPacketExecutor implements 
QueryCommandExecutor {
     public MySQLPacket getQueryRowPacket() throws SQLException {
         return new MySQLTextResultSetRowPacket(++currentSequenceId, 
textProtocolBackendHandler.getRowData());
     }
+    
+    @Override
+    public void close() throws SQLException {
+        textProtocolBackendHandler.close();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
index 6da37e2..f5bb0c1 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQ
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -65,7 +66,7 @@ public final class PostgreSQLCommandExecutorFactory {
         log.debug("Execute packet type: {}, value: {}", commandPacketType, 
commandPacket);
         switch (commandPacketType) {
             case SIMPLE_QUERY:
-                return new 
PostgreSQLComQueryExecutor((PostgreSQLComQueryPacket) commandPacket, 
backendConnection);
+                return new PostgreSQLComQueryExecutor(connectionContext, 
(PostgreSQLComQueryPacket) commandPacket, backendConnection);
             case PARSE_COMMAND:
                 return new PostgreSQLComParseExecutor(connectionContext, 
(PostgreSQLComParsePacket) commandPacket, backendConnection);
             case BIND_COMMAND:
@@ -77,11 +78,11 @@ public final class PostgreSQLCommandExecutorFactory {
                 connectionContext.getPendingExecutors().add(new 
PostgreSQLComDescribeExecutor(connectionContext));
                 break;
             case EXECUTE_COMMAND:
-                return new PostgreSQLComExecuteExecutor(connectionContext);
+                return new PostgreSQLComExecuteExecutor(connectionContext, 
(PostgreSQLComExecutePacket) commandPacket);
             case SYNC_COMMAND:
                 return new PostgreSQLComSyncExecutor(connectionContext, 
backendConnection);
             case CLOSE_COMMAND:
-                connectionContext.getPendingExecutors().add(new 
PostgreSQLComCloseExecutor((PostgreSQLComClosePacket) commandPacket, 
backendConnection));
+                connectionContext.getPendingExecutors().add(new 
PostgreSQLComCloseExecutor(connectionContext, (PostgreSQLComClosePacket) 
commandPacket, backendConnection));
                 break;
             case TERMINATE:
                 return new PostgreSQLComTerminationExecutor();
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
index 43a01ed..c05f4ad 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
@@ -19,28 +19,113 @@ package 
org.apache.shardingsphere.proxy.frontend.postgresql.command;
 
 import lombok.Getter;
 import lombok.Setter;
+import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
 
+import java.sql.SQLException;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
  * PostgreSQL connection context.
  */
-@Getter
 @Setter
 public final class PostgreSQLConnectionContext {
     
+    private final Map<String, PostgreSQLPortal> portals = new 
LinkedHashMap<>();
+    
+    @Getter
     private final Collection<CommandExecutor> pendingExecutors = new 
LinkedList<>();
     
     private SQLStatement sqlStatement;
     
+    @Getter
     private long updateCount;
     
     /**
+     * Create a portal.
+     *
+     * @param portal portal name
+     * @param sql sql
+     * @param parameters bind parameters
+     * @param resultFormats result formats
+     * @param backendConnection backend connection
+     * @return a new portal
+     */
+    public PostgreSQLPortal createPortal(final String portal, final String 
sql, final List<Object> parameters, final List<PostgreSQLValueFormat> 
resultFormats, final BackendConnection backendConnection) throws SQLException {
+        if (!getSqlStatement().isPresent()) {
+            SQLStatement result = parseSql(sql, 
backendConnection.getSchemaName());
+            setSqlStatement(result);
+        }
+        PostgreSQLPortal result = new PostgreSQLPortal(sqlStatement, sql, 
parameters, resultFormats, backendConnection);
+        portals.put(portal, result);
+        return result;
+    }
+    
+    private SQLStatement parseSql(final String sql, final String schemaName) {
+        if (sql.isEmpty()) {
+            return new EmptyStatement();
+        }
+        ShardingSphereSQLParserEngine sqlStatementParserEngine = new 
ShardingSphereSQLParserEngine(
+                
DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
+        return sqlStatementParserEngine.parse(sql, true);
+    }
+    
+    /**
+     * Get portal.
+     *
+     * @param portal portal name
+     * @return portal
+     */
+    public PostgreSQLPortal getPortal(final String portal) {
+        return portals.get(portal);
+    }
+    
+    /**
+     * Close portal.
+     *
+     * @param portal portal name
+     */
+    public void closePortal(final String portal) throws SQLException {
+        PostgreSQLPortal result = portals.remove(portal);
+        if (null != result) {
+            result.close();
+        }
+    }
+    
+    /**
+     * Close all portals.
+     */
+    public void closeAllPortals() {
+        Collection<SQLException> result = new LinkedList<>();
+        for (PostgreSQLPortal each : portals.values()) {
+            try {
+                each.close();
+            } catch (final SQLException ex) {
+                result.add(ex);
+            }
+        }
+        portals.clear();
+        if (result.isEmpty()) {
+            return;
+        }
+        SQLException ex = new SQLException("Close all portals failed.");
+        result.forEach(ex::setNextException);
+    }
+    
+    /**
      * Get describe command executor.
      *
      * @return describe command executor
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/PostgreSQLPortal.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/PostgreSQLPortal.java
new file mode 100644
index 0000000..ce57367
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/PostgreSQLPortal.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary;
+
+import lombok.Getter;
+import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
+import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
+import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
+import 
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
+import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * PostgreSQL portal.
+ */
+@Getter
+public final class PostgreSQLPortal {
+    
+    private final List<PostgreSQLValueFormat> resultFormats;
+    
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
+    
+    private final TextProtocolBackendHandler textProtocolBackendHandler;
+    
+    public PostgreSQLPortal(final SQLStatement sqlStatement, final String sql, 
final List<Object> parameters, final List<PostgreSQLValueFormat> resultFormats, 
final BackendConnection backendConnection) throws SQLException {
+        this.resultFormats = resultFormats;
+        if (sqlStatement instanceof TCLStatement || sqlStatement instanceof 
EmptyStatement) {
+            databaseCommunicationEngine = null;
+            textProtocolBackendHandler = 
TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"),
 sql, backendConnection);
+            return;
+        }
+        databaseCommunicationEngine = 
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement,
 sql, parameters, backendConnection);
+        textProtocolBackendHandler = null;
+    }
+    
+    /**
+     * Execute portal.
+     *
+     * @return response header
+     * @throws SQLException SQL exception
+     */
+    public ResponseHeader execute() throws SQLException {
+        return null != databaseCommunicationEngine ? 
databaseCommunicationEngine.execute() : textProtocolBackendHandler.execute();
+    }
+    
+    /**
+     * Next.
+     *
+     * @return true if the portal has next packet; else false
+     * @throws SQLException SQL exception
+     */
+    public boolean next() throws SQLException {
+        return null != databaseCommunicationEngine && 
databaseCommunicationEngine.next();
+    }
+    
+    /**
+     * Fetch next packet from the portal.
+     *
+     * @return next packet
+     * @throws SQLException SQL exception
+     */
+    public PostgreSQLPacket nextPacket() throws SQLException {
+        QueryResponseRow queryResponseRow = 
databaseCommunicationEngine.getQueryResponseRow();
+        return new PostgreSQLDataRowPacket(getData(queryResponseRow));
+    }
+    
+    private List<Object> getData(final QueryResponseRow queryResponseRow) {
+        Collection<QueryResponseCell> cells = queryResponseRow.getCells();
+        List<Object> result = new ArrayList<>(cells.size());
+        List<QueryResponseCell> columns = new ArrayList<>(cells);
+        for (int i = 0; i < columns.size(); i++) {
+            PostgreSQLValueFormat format = determineValueFormat(i);
+            result.add(PostgreSQLValueFormat.BINARY == format ? 
createBinaryCell(columns.get(i)) : columns.get(i).getData());
+        }
+        return result;
+    }
+    
+    private PostgreSQLValueFormat determineValueFormat(final int columnIndex) {
+        return resultFormats.isEmpty() ? PostgreSQLValueFormat.TEXT : 
resultFormats.get(columnIndex % resultFormats.size());
+    }
+    
+    private BinaryCell createBinaryCell(final QueryResponseCell cell) {
+        return new 
BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
 cell).getJdbcType()), cell.getData());
+    }
+    
+    /**
+     * Close portal.
+     */
+    public void close() throws SQLException {
+        if (null != databaseCommunicationEngine) {
+            databaseCommunicationEngine.close();
+        }
+        if (null != textProtocolBackendHandler) {
+            textProtocolBackendHandler.close();
+        }
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
index b85a712..ee879bd 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
@@ -22,7 +22,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat;
+import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
@@ -159,12 +159,17 @@ public final class OpenGaussComBatchBindExecutor 
implements QueryCommandExecutor
         List<Object> result = new ArrayList<>(cells.size());
         List<QueryResponseCell> columns = new ArrayList<>(cells);
         for (int i = 0; i < columns.size(); i++) {
-            PostgreSQLColumnFormat format = 
packet.getResultFormatByColumnIndex(i);
-            result.add(PostgreSQLColumnFormat.BINARY == format ? 
createBinaryCell(columns.get(i)) : columns.get(i).getData());
+            PostgreSQLValueFormat format = determineValueFormat(i);
+            result.add(PostgreSQLValueFormat.BINARY == format ? 
createBinaryCell(columns.get(i)) : columns.get(i).getData());
         }
         return result;
     }
     
+    private PostgreSQLValueFormat determineValueFormat(final int columnIndex) {
+        List<PostgreSQLValueFormat> resultFormats = packet.getResultFormats();
+        return resultFormats.isEmpty() ? PostgreSQLValueFormat.TEXT : 
resultFormats.get(columnIndex % resultFormats.size());
+    }
+    
     private BinaryCell createBinaryCell(final QueryResponseCell cell) {
         return new 
BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
 cell).getJdbcType()), cell.getData());
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
index 78a1a31..200e621 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
@@ -17,42 +17,22 @@
 
 package 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
 
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
-import 
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
-import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
-import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -61,7 +41,7 @@ import java.util.List;
  * Command bind executor for PostgreSQL.
  */
 @RequiredArgsConstructor
-public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
+public final class PostgreSQLComBindExecutor implements CommandExecutor {
     
     private final PostgreSQLConnectionContext connectionContext;
     
@@ -69,60 +49,22 @@ public final class PostgreSQLComBindExecutor implements 
QueryCommandExecutor {
     
     private final BackendConnection backendConnection;
     
-    private DatabaseCommunicationEngine databaseCommunicationEngine;
-    
-    private TextProtocolBackendHandler textProtocolBackendHandler;
-    
-    @Getter
-    private volatile ResponseType responseType;
-    
     @Override
     public Collection<DatabasePacket<?>> execute() throws SQLException {
-        init();
+        PostgreSQLPortal portal = 
connectionContext.createPortal(packet.getPortal(), packet.getSql(), 
packet.getParameters(), packet.getResultFormats(), backendConnection);
         List<DatabasePacket<?>> result = new LinkedList<>();
         result.add(new PostgreSQLBindCompletePacket());
-        if (null == databaseCommunicationEngine && null == 
textProtocolBackendHandler) {
-            return result;
-        }
-        ResponseHeader responseHeader = null != databaseCommunicationEngine ? 
databaseCommunicationEngine.execute() : textProtocolBackendHandler.execute();
-        if (responseHeader instanceof QueryResponseHeader && 
connectionContext.getDescribeExecutor().isPresent()) {
-            
connectionContext.getDescribeExecutor().get().setRowDescriptionPacket(getRowDescriptionPacket((QueryResponseHeader)
 responseHeader));
+        ResponseHeader responseHeader = portal.execute();
+        if (responseHeader instanceof QueryResponseHeader) {
+            connectionContext.getDescribeExecutor().ifPresent(describeExecutor 
-> 
describeExecutor.setRowDescriptionPacket(createRowDescriptionPacket((QueryResponseHeader)
 responseHeader)));
         }
         if (responseHeader instanceof UpdateResponseHeader) {
-            responseType = ResponseType.UPDATE;
             connectionContext.setUpdateCount(((UpdateResponseHeader) 
responseHeader).getUpdateCount());
         }
         return result;
     }
     
-    private void init() throws SQLException {
-        SQLStatement sqlStatement = getSqlStatement();
-        if (sqlStatement instanceof TCLStatement || sqlStatement instanceof 
EmptyStatement) {
-            textProtocolBackendHandler = 
TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"),
 packet.getSql(), backendConnection);
-            return;
-        }
-        databaseCommunicationEngine = 
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement,
 packet.getSql(), packet.getParameters(), backendConnection);
-    }
-    
-    private SQLStatement getSqlStatement() {
-        return connectionContext.getSqlStatement().orElseGet(() -> {
-            SQLStatement result = parseSql(packet.getSql(), 
backendConnection.getSchemaName());
-            connectionContext.setSqlStatement(result);
-            return result;
-        });
-    }
-    
-    private SQLStatement parseSql(final String sql, final String schemaName) {
-        if (sql.isEmpty()) {
-            return new EmptyStatement();
-        }
-        ShardingSphereSQLParserEngine sqlStatementParserEngine = new 
ShardingSphereSQLParserEngine(
-                
DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
-        return sqlStatementParserEngine.parse(sql, true);
-    }
-    
-    private PostgreSQLRowDescriptionPacket getRowDescriptionPacket(final 
QueryResponseHeader queryResponseHeader) {
-        responseType = ResponseType.QUERY;
+    private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final 
QueryResponseHeader queryResponseHeader) {
         Collection<PostgreSQLColumnDescription> columnDescriptions = 
createColumnDescriptions(queryResponseHeader);
         return new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), 
columnDescriptions);
     }
@@ -135,35 +77,4 @@ public final class PostgreSQLComBindExecutor implements 
QueryCommandExecutor {
         }
         return result;
     }
-    
-    @Override
-    public boolean next() throws SQLException {
-        return null != databaseCommunicationEngine && 
databaseCommunicationEngine.next();
-    }
-    
-    @Override
-    public PostgreSQLPacket getQueryRowPacket() throws SQLException {
-        QueryResponseRow queryResponseRow = 
databaseCommunicationEngine.getQueryResponseRow();
-        return new PostgreSQLDataRowPacket(getData(queryResponseRow));
-    }
-    
-    private List<Object> getData(final QueryResponseRow queryResponseRow) {
-        Collection<QueryResponseCell> cells = queryResponseRow.getCells();
-        List<Object> result = new ArrayList<>(cells.size());
-        List<QueryResponseCell> columns = new ArrayList<>(cells);
-        for (int i = 0; i < columns.size(); i++) {
-            PostgreSQLValueFormat format = determineValueFormat(i);
-            result.add(PostgreSQLValueFormat.BINARY == format ? 
createBinaryCell(columns.get(i)) : columns.get(i).getData());
-        }
-        return result;
-    }
-    
-    private PostgreSQLValueFormat determineValueFormat(final int columnIndex) {
-        List<PostgreSQLValueFormat> resultFormats = packet.getResultFormats();
-        return resultFormats.isEmpty() ? PostgreSQLValueFormat.TEXT : 
resultFormats.get(columnIndex % resultFormats.size());
-    }
-    
-    private BinaryCell createBinaryCell(final QueryResponseCell cell) {
-        return new 
BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell)
 cell).getJdbcType()), cell.getData());
-    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
index 6504938..e0b8532 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutor.java
@@ -19,15 +19,13 @@ package 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLErrorCode;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLMessageSeverityLevel;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.ConnectionScopeBinaryStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLCloseCompletePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -39,6 +37,8 @@ import java.util.Collections;
 @RequiredArgsConstructor
 public final class PostgreSQLComCloseExecutor implements CommandExecutor {
     
+    private final PostgreSQLConnectionContext connectionContext;
+    
     private final PostgreSQLComClosePacket packet;
     
     private final BackendConnection backendConnection;
@@ -47,25 +47,25 @@ public final class PostgreSQLComCloseExecutor implements 
CommandExecutor {
     public Collection<DatabasePacket<?>> execute() throws SQLException {
         switch (packet.getType()) {
             case PREPARED_STATEMENT:
-                return closePreparedStatement();
+                closePreparedStatement();
+                break;
             case PORTAL:
-                return closePortal();
+                closePortal();
+                break;
             default:
                 throw new 
UnsupportedOperationException(packet.getType().name());
         }
+        return Collections.singletonList(new PostgreSQLCloseCompletePacket());
     }
     
-    private Collection<DatabasePacket<?>> closePreparedStatement() {
+    private void closePreparedStatement() {
         ConnectionScopeBinaryStatementRegistry binaryStatementRegistry = 
PostgreSQLBinaryStatementRegistry.getInstance().get(backendConnection.getConnectionId());
         if (null != binaryStatementRegistry) {
             binaryStatementRegistry.closeStatement(packet.getName());
         }
-        return Collections.singletonList(new PostgreSQLCloseCompletePacket());
     }
     
-    private Collection<DatabasePacket<?>> closePortal() {
-        PostgreSQLErrorResponsePacket packet = 
PostgreSQLErrorResponsePacket.newBuilder(
-                PostgreSQLMessageSeverityLevel.ERROR, 
PostgreSQLErrorCode.FEATURE_NOT_SUPPORTED, "Not implemented: Close 
portal").build();
-        return Collections.singleton(packet);
+    private void closePortal() throws SQLException {
+        connectionContext.closePortal(packet.getName());
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
index 1e4ba7e..af6d6e9 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
@@ -20,19 +20,21 @@ package 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLPortalSuspendedPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Optional;
 
@@ -40,51 +42,48 @@ import java.util.Optional;
  * Command execute executor for PostgreSQL.
  */
 @RequiredArgsConstructor
-public final class PostgreSQLComExecuteExecutor implements 
QueryCommandExecutor {
+public final class PostgreSQLComExecuteExecutor implements CommandExecutor {
     
     private final PostgreSQLConnectionContext connectionContext;
     
-    private final Collection<QueryCommandExecutor> queryCommandExecutors = new 
LinkedList<>();
+    private final PostgreSQLComExecutePacket packet;
     
     private long dataRows;
     
-    private boolean commandComplete;
-    
     @Override
     public Collection<DatabasePacket<?>> execute() throws SQLException {
         Collection<DatabasePacket<?>> result = new LinkedList<>();
         for (CommandExecutor each : connectionContext.getPendingExecutors()) {
-            if (each instanceof QueryCommandExecutor) {
-                queryCommandExecutors.add((QueryCommandExecutor) each);
-            }
             result.addAll(each.execute());
         }
         connectionContext.getPendingExecutors().clear();
+        result.addAll(doExecute());
+        result.add(createExecutionCompletedPacket());
         return result;
     }
     
-    @Override
-    public ResponseType getResponseType() {
-        return ResponseType.QUERY;
+    private Collection<? extends DatabasePacket<?>> doExecute() throws 
SQLException {
+        Collection<DatabasePacket<?>> result = new LinkedList<>();
+        while (!reachedMaxRows()) {
+            Optional<DatabasePacket<?>> packet = getPacketFromPortal();
+            if (!packet.isPresent()) {
+                break;
+            }
+            dataRows++;
+            result.add(packet.get());
+        }
+        return result;
     }
     
-    @Override
-    public boolean next() throws SQLException {
-        return !commandComplete;
+    private Optional<DatabasePacket<?>> getPacketFromPortal() throws 
SQLException {
+        PostgreSQLPortal portal = 
connectionContext.getPortal(packet.getPortal());
+        return portal.next() ? Optional.of(portal.nextPacket()) : 
Optional.empty();
     }
     
-    @Override
-    public DatabasePacket<?> getQueryRowPacket() throws SQLException {
-        Optional<DatabasePacket<?>> result = 
getPacketFromQueryCommandExecutors();
-        if (result.isPresent()) {
-            dataRows++;
-            return result.get();
+    private PostgreSQLIdentifierPacket createExecutionCompletedPacket() {
+        if (reachedMaxRows()) {
+            return new PostgreSQLPortalSuspendedPacket();
         }
-        return createCommandCompletePacket();
-    }
-    
-    private PostgreSQLIdentifierPacket createCommandCompletePacket() {
-        commandComplete = true;
         if 
(connectionContext.getSqlStatement().map(EmptyStatement.class::isInstance).orElse(false))
 {
             return new PostgreSQLEmptyQueryResponsePacket();
         }
@@ -94,16 +93,18 @@ public final class PostgreSQLComExecuteExecutor implements 
QueryCommandExecutor
         return result;
     }
     
-    private Optional<DatabasePacket<?>> getPacketFromQueryCommandExecutors() 
throws SQLException {
-        Iterator<QueryCommandExecutor> iterator = 
queryCommandExecutors.iterator();
-        while (iterator.hasNext()) {
-            QueryCommandExecutor next = iterator.next();
-            if (next.next()) {
-                return Optional.of(next.getQueryRowPacket());
-            } else {
-                iterator.remove();
-            }
+    private boolean reachedMaxRows() {
+        return packet.getMaxRows() > 0 && dataRows == packet.getMaxRows();
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        if (!reachedMaxRows()) {
+            connectionContext.getPortal(packet.getPortal()).close();
+        }
+        if (connectionContext.getSqlStatement().isPresent() &&
+                (connectionContext.getSqlStatement().get() instanceof 
CommitStatement || connectionContext.getSqlStatement().get() instanceof 
RollbackStatement)) {
+            connectionContext.closeAllPortals();
         }
-        return Optional.empty();
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
index 5e11134..6bd5074 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
@@ -36,8 +36,12 @@ import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -49,12 +53,15 @@ import java.util.LinkedList;
  */
 public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
     
+    private final PostgreSQLConnectionContext connectionContext;
+    
     private final TextProtocolBackendHandler textProtocolBackendHandler;
     
     @Getter
     private volatile ResponseType responseType;
     
-    public PostgreSQLComQueryExecutor(final PostgreSQLComQueryPacket 
comQueryPacket, final BackendConnection backendConnection) throws SQLException {
+    public PostgreSQLComQueryExecutor(final PostgreSQLConnectionContext 
connectionContext, final PostgreSQLComQueryPacket comQueryPacket, final 
BackendConnection backendConnection) throws SQLException {
+        this.connectionContext = connectionContext;
         textProtocolBackendHandler = 
TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"),
 comQueryPacket.getSql(), backendConnection);
     }
     
@@ -84,8 +91,12 @@ public final class PostgreSQLComQueryExecutor implements 
QueryCommandExecutor {
     }
     
     private PostgreSQLPacket createUpdatePacket(final UpdateResponseHeader 
updateResponseHeader) {
-        return updateResponseHeader.getSqlStatement() instanceof 
EmptyStatement ? new PostgreSQLEmptyQueryResponsePacket()
-                : new 
PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(updateResponseHeader.getSqlStatement().getClass()).map(Enum::name).orElse(""),
 updateResponseHeader.getUpdateCount());
+        SQLStatement sqlStatement = updateResponseHeader.getSqlStatement();
+        if (sqlStatement instanceof CommitStatement || sqlStatement instanceof 
RollbackStatement) {
+            connectionContext.closeAllPortals();
+        }
+        return sqlStatement instanceof EmptyStatement ? new 
PostgreSQLEmptyQueryResponsePacket()
+                : new 
PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(sqlStatement.getClass()).map(Enum::name).orElse(""),
 updateResponseHeader.getUpdateCount());
     }
     
     @Override
@@ -97,4 +108,9 @@ public final class PostgreSQLComQueryExecutor implements 
QueryCommandExecutor {
     public PostgreSQLPacket getQueryRowPacket() throws SQLException {
         return new 
PostgreSQLDataRowPacket(textProtocolBackendHandler.getRowData());
     }
+    
+    @Override
+    public void close() throws SQLException {
+        textProtocolBackendHandler.close();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
index ec753b1..25c67ea 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.Res
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
-import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.PostgreSQLComBindExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.sync.PostgreSQLComSyncExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.PostgreSQLComQueryExecutor;
 import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -128,13 +127,4 @@ public final class PostgreSQLCommandExecuteEngineTest {
         verify(channelHandlerContext).flush();
         
verify(channelHandlerContext).write(isA(PostgreSQLReadyForQueryPacket.class));
     }
-    
-    @Test
-    public void assertWriteQueryDataWithComBindWithUpdateResponse() throws 
SQLException {
-        PostgreSQLComBindExecutor bindExecutor = 
mock(PostgreSQLComBindExecutor.class);
-        when(bindExecutor.getResponseType()).thenReturn(ResponseType.UPDATE);
-        PostgreSQLCommandExecuteEngine commandExecuteEngine = new 
PostgreSQLCommandExecuteEngine();
-        boolean actual = 
commandExecuteEngine.writeQueryData(channelHandlerContext, backendConnection, 
bindExecutor, 0);
-        assertFalse(actual);
-    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
index 9ace085..00cd592 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
@@ -17,22 +17,14 @@
 
 package 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
 
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
 import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
-import 
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
@@ -42,9 +34,6 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.sql.JDBCType;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
@@ -53,9 +42,7 @@ import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -89,8 +76,6 @@ public final class PostgreSQLComBindExecutorTest {
         Collection<DatabasePacket<?>> actual = executor.execute();
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLBindCompletePacket.class)));
-        assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
-        assertFalse(executor.next());
     }
     
     @Test
@@ -99,11 +84,9 @@ public final class PostgreSQLComBindExecutorTest {
         QueryResponseHeader queryResponseHeader = 
mock(QueryResponseHeader.class);
         
when(databaseCommunicationEngine.execute()).thenReturn(queryResponseHeader);
         PostgreSQLComBindExecutor executor = new 
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
-        setMockFieldIntoExecutor(executor);
         Collection<DatabasePacket<?>> actual = executor.execute();
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLBindCompletePacket.class)));
-        assertThat(executor.getResponseType(), is(ResponseType.QUERY));
         verify(queryResponseHeader).getQueryHeaders();
     }
     
@@ -114,66 +97,18 @@ public final class PostgreSQLComBindExecutorTest {
         
when(queryResponseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(new
 QueryHeader("schema", "table", "label", "column", 1, "type", 2, 3, true, true, 
true, true)));
         
when(databaseCommunicationEngine.execute()).thenReturn(queryResponseHeader);
         PostgreSQLComBindExecutor executor = new 
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
-        setMockFieldIntoExecutor(executor);
         Collection<DatabasePacket<?>> actual = executor.execute();
         assertThat(actual.size(), is(1));
         Iterator<DatabasePacket<?>> actualPackets = actual.iterator();
         assertThat(actualPackets.next(), 
is(instanceOf(PostgreSQLBindCompletePacket.class)));
-        assertThat(executor.getResponseType(), is(ResponseType.QUERY));
-    }
-    
-    @Test
-    public void assertNext() throws SQLException {
-        when(databaseCommunicationEngine.next()).thenReturn(true, false);
-        PostgreSQLComBindExecutor executor = new 
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
-        setMockFieldIntoExecutor(executor);
-        assertTrue(executor.next());
-        assertFalse(executor.next());
-    }
-    
-    @Test
-    public void assertDataRowNotBinary() throws SQLException {
-        QueryResponseRow queryResponseRow = mock(QueryResponseRow.class);
-        
when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(queryResponseRow);
-        PostgreSQLComBindExecutor executor = new 
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
-        setMockFieldIntoExecutor(executor);
-        PostgreSQLPacket actualQueryRowPacket = executor.getQueryRowPacket();
-        verify(queryResponseRow).getCells();
-        assertThat(actualQueryRowPacket, 
is(instanceOf(PostgreSQLDataRowPacket.class)));
-    }
-    
-    @Test
-    public void assertDataRowIsBinary() throws SQLException {
-        
when(bindPacket.getResultFormats()).thenReturn(Collections.singletonList(PostgreSQLValueFormat.BINARY));
-        QueryResponseRow queryResponseRow = mock(QueryResponseRow.class);
-        
when(queryResponseRow.getCells()).thenReturn(Collections.singletonList(new 
BinaryQueryResponseCell(JDBCType.BIGINT.getVendorTypeNumber(), 
Long.MAX_VALUE)));
-        
when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(queryResponseRow);
-        PostgreSQLComBindExecutor executor = new 
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
-        setMockFieldIntoExecutor(executor);
-        PostgreSQLPacket actualQueryRowPacket = executor.getQueryRowPacket();
-        assertThat(actualQueryRowPacket, 
is(instanceOf(PostgreSQLDataRowPacket.class)));
-        Collection<Object> actualData = ((PostgreSQLDataRowPacket) 
actualQueryRowPacket).getData();
-        assertThat(actualData.iterator().next(), instanceOf(BinaryCell.class));
     }
     
     @Test
     public void assertExecuteBindPacketWithUpdateSQL() throws SQLException {
         when(databaseCommunicationEngine.execute()).thenReturn(new 
UpdateResponseHeader(mock(InsertStatement.class)));
         PostgreSQLComBindExecutor executor = new 
PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
-        setMockFieldIntoExecutor(executor);
         Collection<DatabasePacket<?>> actual = executor.execute();
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLBindCompletePacket.class)));
-        assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
-    }
-    
-    @SneakyThrows
-    private void setMockFieldIntoExecutor(final PostgreSQLComBindExecutor 
executor) {
-        Field field = 
PostgreSQLComBindExecutor.class.getDeclaredField("databaseCommunicationEngine");
-        field.setAccessible(true);
-        Field modifiers = Field.class.getDeclaredField("modifiers");
-        modifiers.setAccessible(true);
-        modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(executor, databaseCommunicationEngine);
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
index 8e747db..4e8453e 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/close/PostgreSQLComCloseExecutorTest.java
@@ -21,8 +21,8 @@ import 
org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLCloseCompletePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
-import 
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,6 +36,7 @@ import java.util.Random;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -44,6 +45,9 @@ public final class PostgreSQLComCloseExecutorTest {
     private static final int CONNECTION_ID = new Random().nextInt() & 
Integer.MAX_VALUE;
     
     @Mock
+    private PostgreSQLConnectionContext connectionContext;
+    
+    @Mock
     private PostgreSQLComClosePacket packet;
     
     @Mock
@@ -59,7 +63,7 @@ public final class PostgreSQLComCloseExecutorTest {
     public void assertExecuteClosePreparedStatement() throws SQLException {
         
when(packet.getType()).thenReturn(PostgreSQLComClosePacket.Type.PREPARED_STATEMENT);
         when(packet.getName()).thenReturn("S_1");
-        PostgreSQLComCloseExecutor closeExecutor = new 
PostgreSQLComCloseExecutor(packet, backendConnection);
+        PostgreSQLComCloseExecutor closeExecutor = new 
PostgreSQLComCloseExecutor(connectionContext, packet, backendConnection);
         Collection<DatabasePacket<?>> actual = closeExecutor.execute();
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLCloseCompletePacket.class)));
@@ -68,9 +72,12 @@ public final class PostgreSQLComCloseExecutorTest {
     @Test
     public void assertExecuteClosePortal() throws SQLException {
         
when(packet.getType()).thenReturn(PostgreSQLComClosePacket.Type.PORTAL);
-        PostgreSQLComCloseExecutor closeExecutor = new 
PostgreSQLComCloseExecutor(packet, backendConnection);
+        String portalName = "C_1";
+        when(packet.getName()).thenReturn(portalName);
+        PostgreSQLComCloseExecutor closeExecutor = new 
PostgreSQLComCloseExecutor(connectionContext, packet, backendConnection);
         Collection<DatabasePacket<?>> actual = closeExecutor.execute();
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLErrorResponsePacket.class)));
+        assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLCloseCompletePacket.class)));
+        verify(connectionContext).closePortal(portalName);
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
index dd46dfb..b9d5841 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
@@ -20,12 +20,14 @@ package 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
+import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
-import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.PostgreSQLPortal;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -35,13 +37,13 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -52,6 +54,12 @@ public final class PostgreSQLComExecuteExecutorTest {
     private PostgreSQLConnectionContext connectionContext;
     
     @Mock
+    private PostgreSQLComExecutePacket packet;
+    
+    @Mock
+    private PostgreSQLPortal portal;
+    
+    @Mock
     private QueryCommandExecutor queryCommandExecutor;
     
     @Mock
@@ -60,35 +68,33 @@ public final class PostgreSQLComExecuteExecutorTest {
     @Mock
     private PostgreSQLDataRowPacket dataRowPacket;
     
+    @Before
+    public void setup() {
+        when(packet.getPortal()).thenReturn("");
+        when(connectionContext.getPortal(anyString())).thenReturn(portal);
+    }
+    
     @Test
     public void assertExecuteQuery() throws SQLException {
         when(connectionContext.getPendingExecutors()).thenReturn(new 
ArrayList<>(Collections.singletonList(queryCommandExecutor)));
         
when(queryCommandExecutor.execute()).thenReturn(Collections.singletonList(postgreSQLPacket));
-        when(queryCommandExecutor.next()).thenReturn(true, false);
-        when((PostgreSQLDataRowPacket) 
queryCommandExecutor.getQueryRowPacket()).thenReturn(dataRowPacket);
-        PostgreSQLComExecuteExecutor actual = new 
PostgreSQLComExecuteExecutor(connectionContext);
+        when(portal.next()).thenReturn(true, false);
+        when(portal.nextPacket()).thenReturn(dataRowPacket);
+        PostgreSQLComExecuteExecutor actual = new 
PostgreSQLComExecuteExecutor(connectionContext, packet);
         Collection<DatabasePacket<?>> actualPackets = actual.execute();
-        assertThat(actualPackets.size(), is(1));
-        assertThat(actualPackets.iterator().next(), is(postgreSQLPacket));
-        assertTrue(actual.next());
-        assertThat(actual.getQueryRowPacket(), is(dataRowPacket));
-        assertTrue(actual.next());
-        assertThat(actual.getQueryRowPacket(), 
is(instanceOf(PostgreSQLCommandCompletePacket.class)));
-        assertFalse(actual.next());
+        assertThat(actualPackets.size(), is(3));
+        Iterator<DatabasePacket<?>> actualPacketsIterator = 
actualPackets.iterator();
+        assertThat(actualPacketsIterator.next(), is(postgreSQLPacket));
+        assertThat(actualPacketsIterator.next(), is(dataRowPacket));
+        assertThat(actualPacketsIterator.next(), 
instanceOf(PostgreSQLCommandCompletePacket.class));
     }
     
     @Test
     public void assertExecuteUpdate() throws SQLException {
         
when(connectionContext.getSqlStatement()).thenReturn(Optional.of(mock(EmptyStatement.class)));
-        PostgreSQLComExecuteExecutor actual = new 
PostgreSQLComExecuteExecutor(connectionContext);
-        assertTrue(actual.next());
-        assertThat(actual.getQueryRowPacket(), 
is(instanceOf(PostgreSQLEmptyQueryResponsePacket.class)));
-        assertFalse(actual.next());
-    }
-    
-    @Test
-    public void assertResponseType() {
-        ResponseType actual = new 
PostgreSQLComExecuteExecutor(connectionContext).getResponseType();
-        assertThat(actual, is(ResponseType.QUERY));
+        when(portal.next()).thenReturn(false);
+        Collection<DatabasePacket<?>> actual = new 
PostgreSQLComExecuteExecutor(connectionContext, packet).execute();
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), 
is(instanceOf(PostgreSQLEmptyQueryResponsePacket.class)));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
index b127d89..d057907 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutorTest.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryH
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,6 +57,9 @@ import static org.mockito.Mockito.when;
 public final class PostgreSQLComQueryExecutorTest {
     
     @Mock
+    private PostgreSQLConnectionContext connectionContext;
+    
+    @Mock
     private TextProtocolBackendHandler textProtocolBackendHandler;
     
     private PostgreSQLComQueryExecutor queryExecutor;
@@ -65,7 +69,7 @@ public final class PostgreSQLComQueryExecutorTest {
         PostgreSQLComQueryPacket queryPacket = 
mock(PostgreSQLComQueryPacket.class);
         BackendConnection backendConnection = mock(BackendConnection.class);
         when(queryPacket.getSql()).thenReturn("");
-        queryExecutor = new PostgreSQLComQueryExecutor(queryPacket, 
backendConnection);
+        queryExecutor = new PostgreSQLComQueryExecutor(connectionContext, 
queryPacket, backendConnection);
         setMockFieldIntoExecutor(queryExecutor);
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
index c1aa8a4..cc94e31 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
@@ -34,4 +34,12 @@ public interface CommandExecutor {
      * @throws SQLException SQL exception
      */
     Collection<DatabasePacket<?>> execute() throws SQLException;
+    
+    /**
+     * Close command executor.
+     *
+     * @throws SQLException SQL exception
+     */
+    default void close() throws SQLException {
+    }
 }

Reply via email to