This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f9fc2f352f5 Move MySQLPreparedStatementRegistry into ConnectionSession 
(#18545)
f9fc2f352f5 is described below

commit f9fc2f352f5806f2d553d512e2704dca6aa2e208
Author: 吴伟杰 <[email protected]>
AuthorDate: Thu Jun 23 20:41:55 2022 +0800

    Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)
    
    * Refactor PreparedStatement of MySQL Proxy
    
    * Complete tests in db-protocol-mysql
    
    * Complete MySQLCommandPacketFactoryTest
    
    * Complete MySQLComStmtExecuteExecutorTest
    
    * Add PreparedStatementRegistryTest
    
    * Add MySQLComStmtCloseExecutorTest
    
    * Complete MySQLComStmtPrepareExecutorTest
    
    * Fix code style in MySQLComStmtPrepareExecutorTest
    
    * Move MySQLStatementIDGenerator from protocol to frontend
---
 .../binary/MySQLPreparedStatementRegistry.java     | 114 ---------------------
 .../binary/MySQLPreparedStatementRegistryTest.java |  78 --------------
 .../execute/MySQLComStmtExecutePacketTest.java     |  11 --
 .../proxy/backend/session/ConnectionSession.java   |   2 +
 .../proxy/backend/session/PreparedStatement.java   |  37 ++++---
 .../backend/session/PreparedStatementRegistry.java |  60 +++++++++++
 .../session/PreparedStatementRegistryTest.java     |  57 +++++++++++
 .../proxy/frontend/mysql/MySQLFrontendEngine.java  |   4 +-
 .../authentication/MySQLAuthenticationEngine.java  |   4 +-
 .../mysql/command/MySQLCommandExecuteEngine.java   |   2 +-
 .../mysql/command/MySQLCommandExecutorFactory.java |   2 +-
 .../mysql/command/MySQLCommandPacketFactory.java   |  11 +-
 .../query/binary/MySQLPreparedStatement.java       |   9 +-
 .../query/binary/MySQLStatementIDGenerator.java    |  73 +++++++++++++
 .../binary/close/MySQLComStmtCloseExecutor.java    |  14 +--
 .../execute/MySQLComStmtExecuteExecutor.java       |  17 +--
 .../prepare/MySQLComStmtPrepareExecutor.java       |   9 +-
 .../command/MySQLCommandPacketFactoryTest.java     |  79 +++++++-------
 .../binary/MySQLStatementIDGeneratorTest.java      |  46 +++++++++
 .../close/MySQLComStmtCloseExecutorTest.java       |  44 ++++++++
 .../execute/MySQLComStmtExecuteExecutorTest.java   |  45 ++++++--
 .../prepare/MySQLComStmtPrepareExecutorTest.java   |  93 ++++++++++++++++-
 .../ReactiveMySQLComStmtExecuteExecutor.java       |  19 ++--
 23 files changed, 521 insertions(+), 309 deletions(-)

diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
deleted file mode 100644
index 273e56dfccf..00000000000
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * MySQL prepared statement registry.
- */
-@NoArgsConstructor(access = AccessLevel.NONE)
-public final class MySQLPreparedStatementRegistry {
-    
-    private static final MySQLPreparedStatementRegistry INSTANCE = new 
MySQLPreparedStatementRegistry();
-    
-    private final ConcurrentMap<Integer, MySQLConnectionPreparedStatements> 
connectionRegistry = new ConcurrentHashMap<>(8192, 1);
-    
-    /**
-     * Get prepared statement registry instance.
-     *
-     * @return prepared statement registry instance
-     */
-    public static MySQLPreparedStatementRegistry getInstance() {
-        return INSTANCE;
-    }
-    
-    /**
-     * Register connection.
-     *
-     * @param connectionId connection ID
-     */
-    public void registerConnection(final int connectionId) {
-        connectionRegistry.put(connectionId, new 
MySQLConnectionPreparedStatements());
-    }
-    
-    /**
-     * Get connection prepared statements.
-     * 
-     * @param connectionId connection ID
-     * @return MySQL connection prepared statements
-     */
-    public MySQLConnectionPreparedStatements 
getConnectionPreparedStatements(final int connectionId) {
-        return connectionRegistry.get(connectionId);
-    }
-    
-    /**
-     * Unregister connection.
-     *
-     * @param connectionId connection ID
-     */
-    public void unregisterConnection(final int connectionId) {
-        connectionRegistry.remove(connectionId);
-    }
-    
-    public static class MySQLConnectionPreparedStatements {
-        
-        private final Map<Integer, MySQLPreparedStatement> preparedStatements 
= new ConcurrentHashMap<>(16384, 1);
-        
-        private final AtomicInteger sequence = new AtomicInteger();
-        
-        /**
-         * Prepare statement.
-         *
-         * @param sql SQL
-         * @param sqlStatement sql statement of prepared statement
-         * @return statement ID
-         */
-        public int prepareStatement(final String sql, final SQLStatement 
sqlStatement) {
-            int result = sequence.incrementAndGet();
-            preparedStatements.put(result, new MySQLPreparedStatement(sql, 
sqlStatement));
-            return result;
-        }
-        
-        /**
-         * Get prepared statement.
-         *
-         * @param statementId statement ID
-         * @return prepared statement
-         */
-        public MySQLPreparedStatement get(final int statementId) {
-            return preparedStatements.get(statementId);
-        }
-        
-        /**
-         * Close statement.
-         *
-         * @param statementId statement ID
-         */
-        public void closeStatement(final int statementId) {
-            preparedStatements.remove(statementId);
-        }
-    }
-}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
deleted file mode 100644
index 3808fa3a86b..00000000000
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-public final class MySQLPreparedStatementRegistryTest {
-    
-    private static final int CONNECTION_ID = 1;
-    
-    private static final String SQL = "SELECT * FROM tbl WHERE id=?";
-    
-    @Before
-    public void setup() {
-        
MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
-    }
-    
-    @Test
-    public void assertRegisterIfAbsent() {
-        
assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 prepareSQLStatement()), is(1));
-        MySQLPreparedStatement actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertThat(actual.getSql(), is(SQL));
-        assertThat(actual.getSqlStatement().getParameterCount(), is(1));
-    }
-    
-    @Test
-    public void assertPrepareSameSQL() {
-        
assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 prepareSQLStatement()), is(1));
-        
assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 prepareSQLStatement()), is(2));
-        MySQLPreparedStatement actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertThat(actual.getSql(), is(SQL));
-        assertThat(actual.getSqlStatement().getParameterCount(), is(1));
-        actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertThat(actual.getSql(), is(SQL));
-        assertThat(actual.getSqlStatement().getParameterCount(), is(1));
-    }
-    
-    @Test
-    public void assertCloseStatement() {
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 prepareSQLStatement());
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).closeStatement(1);
-        MySQLPreparedStatement actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertNull(actual);
-    }
-    
-    private MySQLSelectStatement prepareSQLStatement() {
-        MySQLSelectStatement result = new MySQLSelectStatement();
-        result.setParameterCount(1);
-        return result;
-    }
-    
-    @After
-    public void tearDown() {
-        
MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
-    }
-}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index 2a57d47c9ae..bf6677eb23e 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -21,10 +21,7 @@ import io.netty.buffer.Unpooled;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
-import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
@@ -39,14 +36,6 @@ import static org.junit.Assert.assertTrue;
 
 public final class MySQLComStmtExecutePacketTest {
     
-    @Before
-    public void setup() {
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(1);
-        MySQLSelectStatement sqlStatement = new MySQLSelectStatement();
-        sqlStatement.setParameterCount(1);
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("SELECT
 id FROM tbl WHERE id=?", sqlStatement);
-    }
-    
     @Test
     public void assertNewWithoutParameter() throws SQLException {
         byte[] data = {0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x00, 0x00, 0x00};
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index c807e19b46d..75b0254e603 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -78,6 +78,8 @@ public final class ConnectionSession {
     
     private final Map<String, CursorStatementContext> cursorDefinitions = new 
ConcurrentHashMap<>();
     
+    private final PreparedStatementRegistry preparedStatementRegistry = new 
PreparedStatementRegistry();
+    
     public ConnectionSession(final DatabaseType databaseType, final 
TransactionType initialTransactionType, final AttributeMap attributeMap) {
         this.databaseType = databaseType;
         transactionStatus = new TransactionStatus(initialTransactionType);
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
similarity index 57%
copy from 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
copy to 
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
index 1b356b54413..daeb62cc0f2 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
@@ -15,27 +15,34 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+package org.apache.shardingsphere.proxy.backend.session;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
-import java.util.Collections;
-import java.util.List;
-
 /**
- * Binary prepared statement for MySQL.
+ * Logic prepared statement for clients of ShardingSphere-Proxy.
  */
-@RequiredArgsConstructor
-@Getter
-@Setter
-public final class MySQLPreparedStatement {
+public interface PreparedStatement {
     
-    private final String sql;
+    /**
+     * Get SQL of prepared statement.
+     *
+     * @return SQL
+     */
+    String getSql();
     
-    private final SQLStatement sqlStatement;
+    /**
+     * Get {@link SQLStatement} of prepared statement.
+     *
+     * @return {@link SQLStatement}
+     */
+    SQLStatement getSqlStatement();
     
-    private List<MySQLPreparedStatementParameterType> parameterTypes = 
Collections.emptyList();
+    /**
+     * Get {@link SQLStatementContext} of prepared statement.
+     *
+     * @return {@link SQLStatementContext}
+     */
+    SQLStatementContext<?> getSqlStatementContext();
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
new file mode 100644
index 00000000000..b474a5deaa3
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * {@link PreparedStatement} registry for {@link ConnectionSession}.
+ */
+public final class PreparedStatementRegistry {
+    
+    private final Map<Object, PreparedStatement> preparedStatements = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Add {@link PreparedStatement} into registry.
+     *
+     * @param statementId statement ID
+     * @param preparedStatement prepared statement
+     */
+    public void addPreparedStatement(final Object statementId, final 
PreparedStatement preparedStatement) {
+        preparedStatements.put(statementId, preparedStatement);
+    }
+    
+    /**
+     * Get prepared statement by statement ID.
+     *
+     * @param <T> implementation of {@link PreparedStatement}
+     * @param statementId statement ID
+     * @return {@link PreparedStatement}
+     */
+    @SuppressWarnings("unchecked")
+    public <T extends PreparedStatement> T getPreparedStatement(final Object 
statementId) {
+        return (T) preparedStatements.get(statementId);
+    }
+    
+    /**
+     * Remove {@link PreparedStatement} from registry.
+     *
+     * @param statementId statement ID
+     */
+    public void removePreparedStatement(final Object statementId) {
+        preparedStatements.remove(statementId);
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
new file mode 100644
index 00000000000..6ad372c6973
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class PreparedStatementRegistryTest {
+    
+    @Test
+    public void assertAddAndGetAndClosePreparedStatement() {
+        PreparedStatement expected = new DummyPreparedStatement();
+        PreparedStatementRegistry registry = new PreparedStatementRegistry();
+        registry.addPreparedStatement(1, expected);
+        assertThat(registry.getPreparedStatement(1), is(expected));
+        registry.removePreparedStatement(1);
+        assertNull(registry.getPreparedStatement(1));
+    }
+    
+    private static class DummyPreparedStatement implements PreparedStatement {
+        
+        @Override
+        public String getSql() {
+            throw new UnsupportedOperationException();
+        }
+        
+        @Override
+        public SQLStatement getSqlStatement() {
+            throw new UnsupportedOperationException();
+        }
+        
+        @Override
+        public SQLStatementContext<?> getSqlStatementContext() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index 6df5b49db04..92ed5880826 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import 
org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -59,7 +59,7 @@ public final class MySQLFrontendEngine implements 
DatabaseProtocolFrontendEngine
     
     @Override
     public void release(final ConnectionSession connectionSession) {
-        
MySQLPreparedStatementRegistry.getInstance().unregisterConnection(connectionSession.getConnectionId());
+        
MySQLStatementIDGenerator.getInstance().unregisterConnection(connectionSession.getConnectionId());
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index 0ec0494b7d2..8a5a0fcd738 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerErrorCode;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLStatusFlag;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -68,7 +68,7 @@ public final class MySQLAuthenticationEngine implements 
AuthenticationEngine {
         int result = ConnectionIdGenerator.getInstance().nextId();
         connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
         context.writeAndFlush(new MySQLHandshakePacket(result, 
authenticationHandler.getAuthPluginData()));
-        
MySQLPreparedStatementRegistry.getInstance().registerConnection(result);
+        MySQLStatementIDGenerator.getInstance().registerConnection(result);
         return result;
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 5721171c8a5..9567549c33b 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -53,7 +53,7 @@ public final class MySQLCommandExecuteEngine implements 
CommandExecuteEngine {
     
     @Override
     public MySQLCommandPacket getCommandPacket(final PacketPayload payload, 
final CommandPacketType type, final ConnectionSession connectionSession) throws 
SQLException {
-        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) 
type, (MySQLPacketPayload) payload, connectionSession.getConnectionId());
+        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) 
type, (MySQLPacketPayload) payload, connectionSession);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
index 70e296d8a0c..2eed6d6fb6a 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
@@ -82,7 +82,7 @@ public final class MySQLCommandExecutorFactory {
             case COM_STMT_RESET:
                 return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket) 
commandPacket, connectionSession);
             case COM_STMT_CLOSE:
-                return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) 
commandPacket, connectionSession.getConnectionId());
+                return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) 
commandPacket, connectionSession);
             case COM_SET_OPTION:
                 return new MySQLComSetOptionExecutor((MySQLComSetOptionPacket) 
commandPacket, connectionSession);
             default:
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
index eec4205d8f4..ed514ac4224 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
@@ -26,8 +26,6 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -35,6 +33,8 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 
 import java.sql.SQLException;
 
@@ -49,11 +49,11 @@ public final class MySQLCommandPacketFactory {
      *
      * @param commandPacketType command packet type for MySQL
      * @param payload packet payload for MySQL
-     * @param connectionId connection ID
+     * @param connectionSession connection session
      * @return created instance
      * @throws SQLException SQL exception
      */
-    public static MySQLCommandPacket newInstance(final MySQLCommandPacketType 
commandPacketType, final MySQLPacketPayload payload, final int connectionId) 
throws SQLException {
+    public static MySQLCommandPacket newInstance(final MySQLCommandPacketType 
commandPacketType, final MySQLPacketPayload payload, final ConnectionSession 
connectionSession) throws SQLException {
         switch (commandPacketType) {
             case COM_QUIT:
                 return new MySQLComQuitPacket();
@@ -66,8 +66,7 @@ public final class MySQLCommandPacketFactory {
             case COM_STMT_PREPARE:
                 return new MySQLComStmtPreparePacket(payload);
             case COM_STMT_EXECUTE:
-                MySQLPreparedStatement preparedStatement = 
MySQLPreparedStatementRegistry.getInstance()
-                        
.getConnectionPreparedStatements(connectionId).get(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
+                MySQLPreparedStatement preparedStatement = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
                 return new MySQLComStmtExecutePacket(payload, 
preparedStatement.getSqlStatement().getParameterCount());
             case COM_STMT_RESET:
                 return new MySQLComStmtResetPacket(payload);
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
similarity index 72%
rename from 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
rename to 
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
index 1b356b54413..9588539f3a2 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.Collections;
@@ -31,11 +34,13 @@ import java.util.List;
 @RequiredArgsConstructor
 @Getter
 @Setter
-public final class MySQLPreparedStatement {
+public final class MySQLPreparedStatement implements PreparedStatement {
     
     private final String sql;
     
     private final SQLStatement sqlStatement;
     
+    private final SQLStatementContext<?> sqlStatementContext;
+    
     private List<MySQLPreparedStatementParameterType> parameterTypes = 
Collections.emptyList();
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
new file mode 100644
index 00000000000..d43619c7a99
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Statement ID generator for MySQL.
+ */
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class MySQLStatementIDGenerator {
+    
+    private static final MySQLStatementIDGenerator INSTANCE = new 
MySQLStatementIDGenerator();
+    
+    private final Map<Integer, AtomicInteger> connectionRegistry = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Get prepared statement registry instance.
+     *
+     * @return prepared statement registry instance
+     */
+    public static MySQLStatementIDGenerator getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register connection.
+     *
+     * @param connectionId connection ID
+     */
+    public void registerConnection(final int connectionId) {
+        connectionRegistry.put(connectionId, new AtomicInteger());
+    }
+    
+    /**
+     * Generate next statement ID for connection.
+     *
+     * @param connectionId connection ID
+     * @return generated statement ID for prepared statement
+     */
+    public int nextStatementId(final int connectionId) {
+        return connectionRegistry.get(connectionId).incrementAndGet();
+    }
+    
+    /**
+     * Unregister connection.
+     *
+     * @param connectionId connection ID
+     */
+    public void unregisterConnection(final int connectionId) {
+        connectionRegistry.remove(connectionId);
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
index 9bf33030490..df9bb3962a4 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
@@ -18,10 +18,9 @@
 package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry.MySQLConnectionPreparedStatements;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 
 import java.util.Collection;
@@ -35,18 +34,11 @@ public final class MySQLComStmtCloseExecutor implements 
CommandExecutor {
     
     private final MySQLComStmtClosePacket packet;
     
-    private final int connectionId;
+    private final ConnectionSession connectionSession;
     
     @Override
     public Collection<DatabasePacket<?>> execute() {
-        MySQLConnectionPreparedStatements connectionPreparedStatements = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionId);
-        if (!connectionReleased(connectionPreparedStatements)) {
-            
connectionPreparedStatements.closeStatement(packet.getStatementId());
-        }
+        
connectionSession.getPreparedStatementRegistry().removePreparedStatement(packet.getStatementId());
         return Collections.emptyList();
     }
-    
-    private boolean connectionReleased(final MySQLConnectionPreparedStatements 
connectionPreparedStatements) {
-        return null == connectionPreparedStatements;
-    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index bd867db3ebe..5aaf00ef658 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,12 +25,10 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.aware.ParameterAware;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -53,6 +51,7 @@ import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFa
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
@@ -88,19 +87,21 @@ public final class MySQLComStmtExecuteExecutor implements 
QueryCommandExecutor {
     @Override
     public Collection<DatabasePacket<?>> execute() throws SQLException {
         MySQLPreparedStatement preparedStatement = 
updateAndGetPreparedStatement();
-        String databaseName = connectionSession.getDatabaseName();
-        MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLStatement sqlStatement = preparedStatement.getSqlStatement();
         if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
             connectionSession.getBackendConnection().handleAutoCommit();
         }
         List<Object> parameters = 
packet.readParameters(preparedStatement.getParameterTypes());
-        SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(),
 parameters,
-                sqlStatement, connectionSession.getDefaultDatabaseName());
+        SQLStatementContext<?> sqlStatementContext = 
preparedStatement.getSqlStatementContext();
+        if (sqlStatementContext instanceof ParameterAware) {
+            ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
+        }
         // TODO optimize SQLStatementDatabaseHolder
         if (sqlStatementContext instanceof TableAvailable) {
             ((TableAvailable) 
sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(SQLStatementDatabaseHolder::set);
         }
+        String databaseName = connectionSession.getDatabaseName();
+        MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLCheckEngine.check(sqlStatement, Collections.emptyList(), 
getRules(databaseName), databaseName, 
metaDataContexts.getMetaData().getDatabases(), connectionSession.getGrantee());
         // TODO Refactor the following branch
         if (sqlStatement instanceof TCLStatement) {
@@ -116,7 +117,7 @@ public final class MySQLComStmtExecuteExecutor implements 
QueryCommandExecutor {
     }
     
     private MySQLPreparedStatement updateAndGetPreparedStatement() {
-        MySQLPreparedStatement result = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).get(packet.getStatementId());
+        MySQLPreparedStatement result = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
         if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == 
packet.getNewParametersBoundFlag()) {
             result.setParameterTypes(packet.getNewParameterTypes());
         }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
index 15cf078b362..99c39c12287 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
@@ -22,12 +22,13 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -38,6 +39,7 @@ import 
org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 
@@ -70,7 +72,10 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
             throw new UnsupportedPreparedStatementException();
         }
         int projectionCount = getProjectionCount(sqlStatement);
-        int statementId = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).prepareStatement(packet.getSql(),
 sqlStatement);
+        int statementId = 
MySQLStatementIDGenerator.getInstance().nextStatementId(connectionSession.getConnectionId());
+        SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabases(),
+                sqlStatement, connectionSession.getDefaultDatabaseName());
+        
connectionSession.getPreparedStatementRegistry().addPreparedStatement(statementId,
 new MySQLPreparedStatement(packet.getSql(), sqlStatement, 
sqlStatementContext));
         return createPackets(statementId, projectionCount, 
sqlStatement.getParameterCount());
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
index 99ef170e35f..276eb511ff9 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -32,6 +31,9 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,35 +51,36 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLCommandPacketFactoryTest {
     
-    private static final int CONNECTION_ID = 1;
-    
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private MySQLPacketPayload payload;
     
+    @Mock
+    private ConnectionSession connectionSession;
+    
     @Test
     public void assertNewInstanceWithComQuitPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT,
 payload, CONNECTION_ID), instanceOf(MySQLComQuitPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT,
 payload, connectionSession), instanceOf(MySQLComQuitPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComInitDbPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB,
 payload, CONNECTION_ID), instanceOf(MySQLComInitDbPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB,
 payload, connectionSession), instanceOf(MySQLComInitDbPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComFieldListPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST,
 payload, CONNECTION_ID), instanceOf(MySQLComFieldListPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST,
 payload, connectionSession), instanceOf(MySQLComFieldListPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComQueryPacket() throws SQLException {
         when(payload.readStringEOF()).thenReturn("SHOW TABLES");
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY,
 payload, CONNECTION_ID), instanceOf(MySQLComQueryPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY,
 payload, connectionSession), instanceOf(MySQLComQueryPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtPreparePacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtPreparePacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE,
 payload, connectionSession), instanceOf(MySQLComStmtPreparePacket.class));
     }
     
     @Test
@@ -85,139 +88,139 @@ public final class MySQLCommandPacketFactoryTest {
         
when(payload.readInt1()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST.getValue());
         when(payload.readInt4()).thenReturn(1);
         when(payload.getByteBuf().getIntLE(anyInt())).thenReturn(1);
-        
MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement("SELECT
 * FROM t_order", new MySQLSelectStatement());
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtExecutePacket.class));
-        
MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
+        PreparedStatementRegistry preparedStatementRegistry = new 
PreparedStatementRegistry();
+        
when(connectionSession.getPreparedStatementRegistry()).thenReturn(preparedStatementRegistry);
+        preparedStatementRegistry.addPreparedStatement(1, new 
MySQLPreparedStatement("select 1", new MySQLSelectStatement(), null));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE,
 payload, connectionSession), instanceOf(MySQLComStmtExecutePacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtClosePacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtClosePacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE,
 payload, connectionSession), instanceOf(MySQLComStmtClosePacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComPingPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING,
 payload, CONNECTION_ID), instanceOf(MySQLComPingPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING,
 payload, connectionSession), instanceOf(MySQLComPingPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComSleepPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComCreateDbPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDropDbPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComRefreshPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComShutDownPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStatisticsPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComProcessInfoPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComConnectPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComProcessKillPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDebugPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComTimePacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDelayedInsertPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComChangeUserPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComBinlogDumpPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComTableDumpPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComConnectOutPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComRegisterSlavePacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtSendLongDataPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtResetPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtResetPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET,
 payload, connectionSession), instanceOf(MySQLComStmtResetPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComSetOptionPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION,
 payload, CONNECTION_ID), instanceOf(MySQLComSetOptionPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION,
 payload, connectionSession), instanceOf(MySQLComSetOptionPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtFetchPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDaemonPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComBinlogDumpGTIDPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComResetConnectionPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
new file mode 100644
index 00000000000..28f52a8d966
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class MySQLStatementIDGeneratorTest {
+    
+    private static final int CONNECTION_ID = 1;
+    
+    @Before
+    public void setup() {
+        
MySQLStatementIDGenerator.getInstance().registerConnection(CONNECTION_ID);
+    }
+    
+    @Test
+    public void assertNextStatementId() {
+        
assertThat(MySQLStatementIDGenerator.getInstance().nextStatementId(CONNECTION_ID),
 is(1));
+        
assertThat(MySQLStatementIDGenerator.getInstance().nextStatementId(CONNECTION_ID),
 is(2));
+    }
+    
+    @After
+    public void tearDown() {
+        
MySQLStatementIDGenerator.getInstance().unregisterConnection(CONNECTION_ID);
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
new file mode 100644
index 00000000000..a1c2da2706e
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
+
+import io.netty.buffer.Unpooled;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public final class MySQLComStmtCloseExecutorTest {
+    
+    @Test
+    public void assertExecute() {
+        MySQLComStmtClosePacket packet = new MySQLComStmtClosePacket(new 
MySQLPacketPayload(Unpooled.wrappedBuffer(new byte[]{0x01, 0x00, 0x00, 0x00}), 
StandardCharsets.UTF_8));
+        ConnectionSession connectionSession = mock(ConnectionSession.class, 
RETURNS_DEEP_STUBS);
+        assertThat(new MySQLComStmtCloseExecutor(packet, 
connectionSession).execute(), is(Collections.emptyList()));
+        
verify(connectionSession.getPreparedStatementRegistry()).removePreparedStatement(1);
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index eb875fe6104..c60ea2db6b1 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -19,14 +19,19 @@ package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.exec
 
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
+import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import 
org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
@@ -40,6 +45,8 @@ import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicati
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
+import 
org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -48,6 +55,7 @@ import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.ColumnAssignmentSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
@@ -66,8 +74,10 @@ import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Supplier;
 
@@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -82,6 +93,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -106,15 +118,17 @@ public final class MySQLComStmtExecuteExecutorTest 
extends ProxyContextRestorer
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
         ProxyContext.init(contextManager);
-        when(connectionSession.getConnectionId()).thenReturn(1);
         
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_GENERAL_CI);
         
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         when(connectionSession.getDatabaseName()).thenReturn("logic_db");
-        
when(connectionSession.getDefaultDatabaseName()).thenReturn("logic_db");
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(1);
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("select
 * from tbl where id = ?", prepareSelectStatement());
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("update
 tbl set col=1 where id = ?", prepareUpdateStatement());
-        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("commit",
 new MySQLCommitStatement());
+        SQLStatementContext<?> selectStatementContext = 
prepareSelectStatementContext();
+        
when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(1))
+                .thenReturn(new MySQLPreparedStatement("select * from tbl 
where id = ?", prepareSelectStatement(), selectStatementContext));
+        UpdateStatementContext updateStatementContext = 
mock(UpdateStatementContext.class, RETURNS_DEEP_STUBS);
+        
when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(2))
+                .thenReturn(new MySQLPreparedStatement("update tbl set col=1 
where id = ?", prepareUpdateStatement(), updateStatementContext));
+        
when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(3))
+                .thenReturn(new MySQLPreparedStatement("commit", new 
MySQLCommitStatement(), new CommonSQLStatementContext<>(new 
MySQLCommitStatement())));
     }
     
     private ShardingSphereDatabase mockDatabase() {
@@ -131,6 +145,12 @@ public final class MySQLComStmtExecuteExecutorTest extends 
ProxyContextRestorer
         return sqlStatement;
     }
     
+    private SQLStatementContext<?> prepareSelectStatementContext() {
+        SelectStatementContext result = mock(SelectStatementContext.class, 
RETURNS_DEEP_STUBS);
+        
when(result.getTablesContext().getDatabaseName()).thenReturn(Optional.empty());
+        return result;
+    }
+    
     private MySQLUpdateStatement prepareUpdateStatement() {
         MySQLUpdateStatement result = new MySQLUpdateStatement();
         ColumnSegment columnSegment = new ColumnSegment(0, 0, new 
IdentifierValue("col"));
@@ -145,6 +165,8 @@ public final class MySQLComStmtExecuteExecutorTest extends 
ProxyContextRestorer
         when(packet.getStatementId()).thenReturn(1);
         MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new 
MySQLComStmtExecuteExecutor(packet, connectionSession);
         when(databaseCommunicationEngine.execute()).thenReturn(new 
QueryResponseHeader(Collections.singletonList(mock(QueryHeader.class))));
+        when(databaseCommunicationEngine.next()).thenReturn(true, false);
+        when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new 
QueryResponseRow(Collections.singletonList(new 
BinaryQueryResponseCell(Types.INTEGER, 1))));
         Iterator<DatabasePacket<?>> actual;
         try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic = 
mockStatic(DatabaseCommunicationEngineFactory.class, RETURNS_DEEP_STUBS)) {
             mockedStatic.when(() -> 
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(any(SQLStatementContext.class),
 anyString(), anyList(), eq(backendConnection)))
@@ -156,12 +178,19 @@ public final class MySQLComStmtExecuteExecutorTest 
extends ProxyContextRestorer
         assertThat(actual.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
         assertThat(actual.next(), instanceOf(MySQLEofPacket.class));
         assertFalse(actual.hasNext());
+        assertTrue(mysqlComStmtExecuteExecutor.next());
+        MySQLPacket actualQueryRowPacket = 
mysqlComStmtExecuteExecutor.getQueryRowPacket();
+        assertThat(actualQueryRowPacket, 
instanceOf(MySQLBinaryResultSetRowPacket.class));
+        assertThat(actualQueryRowPacket.getSequenceId(), is(4));
+        mysqlComStmtExecuteExecutor.close();
+        verify(databaseCommunicationEngine).close();
     }
     
     @Test
     public void assertIsUpdateResponse() throws SQLException {
         MySQLComStmtExecutePacket packet = 
mock(MySQLComStmtExecutePacket.class);
         when(packet.getStatementId()).thenReturn(2);
+        
when(packet.getNewParametersBoundFlag()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST);
         MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new 
MySQLComStmtExecuteExecutor(packet, connectionSession);
         when(databaseCommunicationEngine.execute()).thenReturn(new 
UpdateResponseHeader(new MySQLUpdateStatement()));
         Iterator<DatabasePacket<?>> actual;
@@ -191,5 +220,7 @@ public final class MySQLComStmtExecuteExecutorTest extends 
ProxyContextRestorer
         assertThat(mysqlComStmtExecuteExecutor.getResponseType(), 
is(ResponseType.UPDATE));
         assertThat(actual.next(), instanceOf(MySQLOKPacket.class));
         assertFalse(actual.hasNext());
+        mysqlComStmtExecuteExecutor.close();
+        verify(textProtocolBackendHandler).close();
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
index 10f4ca924d2..ab5ecebff61 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
@@ -19,9 +19,30 @@ package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prep
 
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
 import 
org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
+import org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
+import org.apache.shardingsphere.sql.parser.api.CacheOption;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
@@ -29,11 +50,18 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.sql.SQLException;
+import java.util.Iterator;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class MySQLComStmtPrepareExecutorTest {
+public final class MySQLComStmtPrepareExecutorTest extends 
ProxyContextRestorer {
     
     @Mock
     private MySQLComStmtPreparePacket packet;
@@ -41,12 +69,73 @@ public final class MySQLComStmtPrepareExecutorTest {
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ConnectionSession connectionSession;
     
+    @Before
+    public void setup() {
+        ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+        prepareSQLParser();
+        when(connectionSession.getPreparedStatementRegistry()).thenReturn(new 
PreparedStatementRegistry());
+        
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
+    }
+    
+    private void prepareSQLParser() {
+        ContextManager contextManager = 
ProxyContext.getInstance().getContextManager();
+        MetaDataContexts metaDataContexts = 
contextManager.getMetaDataContexts();
+        
when(metaDataContexts.getMetaData().getGlobalRuleMetaData()).thenReturn(mock(ShardingSphereRuleMetaData.class));
+        CacheOption cacheOption = new CacheOption(1024, 1024);
+        
when(metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class))
+                .thenReturn(new SQLParserRule(new 
SQLParserRuleConfiguration(false, cacheOption, cacheOption)));
+        
when(metaDataContexts.getMetaData().getDatabases().get(connectionSession.getDatabaseName()).getProtocolType()).thenReturn(new
 MySQLDatabaseType());
+    }
+    
     @Test(expected = UnsupportedPreparedStatementException.class)
     public void assertPrepareMultiStatements() throws SQLException {
         when(packet.getSql()).thenReturn("update t set v=v+1 where id=1;update 
t set v=v+1 where id=2;update t set v=v+1 where id=3");
-        
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
         
when(connectionSession.getAttributeMap().hasAttr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS)).thenReturn(true);
         
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS).get()).thenReturn(0);
         new MySQLComStmtPrepareExecutor(packet, connectionSession).execute();
     }
+    
+    @Test
+    public void assertPrepareSelectStatement() throws SQLException {
+        String sql = "select name from t where id = ?";
+        when(packet.getSql()).thenReturn(sql);
+        when(connectionSession.getConnectionId()).thenReturn(1);
+        MySQLStatementIDGenerator.getInstance().registerConnection(1);
+        Iterator<DatabasePacket<?>> actualIterator = new 
MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+        assertThat(actualIterator.next(), 
instanceOf(MySQLComStmtPrepareOKPacket.class));
+        assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertFalse(actualIterator.hasNext());
+        MySQLPreparedStatement actualPreparedStatement = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(1);
+        assertThat(actualPreparedStatement.getSql(), is(sql));
+        assertThat(actualPreparedStatement.getSqlStatement(), 
instanceOf(MySQLSelectStatement.class));
+        assertThat(actualPreparedStatement.getSqlStatementContext(), 
instanceOf(SelectStatementContext.class));
+        MySQLStatementIDGenerator.getInstance().unregisterConnection(1);
+    }
+    
+    @Test
+    public void assertPrepareUpdateStatement() throws SQLException {
+        String sql = "update t set v = ?";
+        when(packet.getSql()).thenReturn(sql);
+        when(connectionSession.getConnectionId()).thenReturn(1);
+        MySQLStatementIDGenerator.getInstance().registerConnection(1);
+        Iterator<DatabasePacket<?>> actualIterator = new 
MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+        assertThat(actualIterator.next(), 
instanceOf(MySQLComStmtPrepareOKPacket.class));
+        assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertFalse(actualIterator.hasNext());
+        MySQLPreparedStatement actualPreparedStatement = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(1);
+        assertThat(actualPreparedStatement.getSql(), is(sql));
+        assertThat(actualPreparedStatement.getSqlStatement(), 
instanceOf(MySQLUpdateStatement.class));
+        assertThat(actualPreparedStatement.getSqlStatementContext(), 
instanceOf(UpdateStatementContext.class));
+        MySQLStatementIDGenerator.getInstance().unregisterConnection(1);
+    }
+    
+    @Test(expected = UnsupportedPreparedStatementException.class)
+    public void assertPrepareNotAllowedStatement() throws SQLException {
+        when(packet.getSql()).thenReturn("begin");
+        new MySQLComStmtPrepareExecutor(packet, connectionSession).execute();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index 3052f9847ee..48880226557 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -27,13 +27,11 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.aware.ParameterAware;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -54,6 +52,7 @@ import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
 import 
org.apache.shardingsphere.proxy.frontend.reactive.command.executor.ReactiveCommandExecutor;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -90,16 +89,18 @@ public final class ReactiveMySQLComStmtExecuteExecutor 
implements ReactiveComman
     @Override
     public Future<Collection<DatabasePacket<?>>> executeFuture() {
         MySQLPreparedStatement preparedStatement = 
updateAndGetPreparedStatement();
-        String databaseName = connectionSession.getDatabaseName();
-        MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
-        SQLStatement sqlStatement = preparedStatement.getSqlStatement();
         List<Object> parameters = 
packet.readParameters(preparedStatement.getParameterTypes());
-        SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(),
 parameters,
-                sqlStatement, connectionSession.getDefaultDatabaseName());
+        SQLStatementContext<?> sqlStatementContext = 
preparedStatement.getSqlStatementContext();
+        if (sqlStatementContext instanceof ParameterAware) {
+            ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
+        }
         // TODO optimize SQLStatementDatabaseHolder
         if (sqlStatementContext instanceof TableAvailable) {
             ((TableAvailable) 
sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(SQLStatementDatabaseHolder::set);
         }
+        SQLStatement sqlStatement = preparedStatement.getSqlStatement();
+        String databaseName = connectionSession.getDatabaseName();
+        MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLCheckEngine.check(sqlStatement, Collections.emptyList(), 
getRules(databaseName), databaseName, 
metaDataContexts.getMetaData().getDatabases(), connectionSession.getGrantee());
         int characterSet = 
connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
         // TODO Refactor the following branch
@@ -130,7 +131,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor 
implements ReactiveComman
     }
     
     private MySQLPreparedStatement updateAndGetPreparedStatement() {
-        MySQLPreparedStatement result = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).get(packet.getStatementId());
+        MySQLPreparedStatement result = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
         if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == 
packet.getNewParametersBoundFlag()) {
             result.setParameterTypes(packet.getNewParameterTypes());
         }

Reply via email to