This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a54ed20c5 Add more test cases on PostgreSQLCommandExecuteEngineTest 
and PostgreSQLAuthenticationEngineTest (#37918)
a1a54ed20c5 is described below

commit a1a54ed20c5729e8a5cf3f7450399ba99d13a0c5
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Feb 1 11:41:10 2026 +0800

    Add more test cases on PostgreSQLCommandExecuteEngineTest and 
PostgreSQLAuthenticationEngineTest (#37918)
    
    * Add more test cases on PostgreSQLCommandExecuteEngineTest and 
PostgreSQLAuthenticationEngineTest
    
    * Add more test cases on PostgreSQLCommandExecuteEngineTest and 
PostgreSQLAuthenticationEngineTest
    
    * Add more test cases on PostgreSQLCommandExecuteEngineTest and 
PostgreSQLAuthenticationEngineTest
    
    * Add more test cases on PostgreSQLCommandExecuteEngineTest and 
PostgreSQLAuthenticationEngineTest
---
 .../command/PostgreSQLCommandExecuteEngine.java    |  14 +-
 .../PostgreSQLAuthenticationEngineTest.java        | 232 ++++++++++++++++-----
 .../PostgreSQLCommandExecuteEngineTest.java        | 230 +++++++++++++++-----
 3 files changed, 360 insertions(+), 116 deletions(-)

diff --git 
a/proxy/frontend/dialect/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
 
b/proxy/frontend/dialect/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index 11c4d7b9986..a2ddd4c5f04 100644
--- 
a/proxy/frontend/dialect/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ 
b/proxy/frontend/dialect/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -85,18 +85,14 @@ public final class PostgreSQLCommandExecuteEngine 
implements CommandExecuteEngin
         processSimpleQuery(context, databaseConnectionManager, 
queryCommandExecutor);
     }
     
-    private void processSimpleQuery(final ChannelHandlerContext context, final 
ProxyDatabaseConnectionManager databaseConnectionManager,
-                                    final QueryCommandExecutor queryExecutor) 
throws SQLException {
-        if (ResponseType.UPDATE == queryExecutor.getResponseType()) {
-            
context.write(databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction()
 ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
-                    : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
-            return;
-        }
-        long dataRows = writeDataPackets(context, databaseConnectionManager, 
queryExecutor);
+    private void processSimpleQuery(final ChannelHandlerContext context,
+                                    final ProxyDatabaseConnectionManager 
databaseConnectionManager, final QueryCommandExecutor queryExecutor) throws 
SQLException {
         if (ResponseType.QUERY == queryExecutor.getResponseType()) {
+            long dataRows = writeDataPackets(context, 
databaseConnectionManager, queryExecutor);
             context.write(new 
PostgreSQLCommandCompletePacket(PostgreSQLCommand.SELECT.name(), dataRows));
         }
-        
context.write(databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction()
 ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
+        
context.write(databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction()
+                ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
                 : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
     }
     
diff --git 
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngineTest.java
 
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngineTest.java
index 2fdfe15ec64..e7b8ec274a6 100644
--- 
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngineTest.java
+++ 
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngineTest.java
@@ -24,30 +24,43 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.Attribute;
 import lombok.SneakyThrows;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.authentication.result.AuthenticationResult;
 import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
 import org.apache.shardingsphere.authority.config.UserConfiguration;
+import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import org.apache.shardingsphere.authority.rule.builder.AuthorityRuleBuilder;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.database.exception.core.exception.syntax.database.UnknownDatabaseException;
 import 
org.apache.shardingsphere.database.exception.postgresql.exception.authority.EmptyUsernameException;
 import 
org.apache.shardingsphere.database.exception.postgresql.exception.authority.InvalidPasswordException;
+import 
org.apache.shardingsphere.database.exception.postgresql.exception.authority.PrivilegeNotGrantedException;
+import 
org.apache.shardingsphere.database.exception.postgresql.exception.authority.UnknownUsernameException;
 import 
org.apache.shardingsphere.database.exception.postgresql.exception.protocol.ProtocolViolationException;
 import org.apache.shardingsphere.database.protocol.constant.CommonConstants;
-import org.apache.shardingsphere.database.protocol.payload.PacketPayload;
+import 
org.apache.shardingsphere.database.protocol.postgresql.constant.PostgreSQLAuthenticationMethod;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.generic.PostgreSQLReadyForQueryPacket;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.handshake.PostgreSQLAuthenticationOKPacket;
 import 
org.apache.shardingsphere.database.protocol.postgresql.packet.handshake.PostgreSQLSSLUnwillingPacket;
 import 
org.apache.shardingsphere.database.protocol.postgresql.packet.handshake.PostgreSQLSSLWillingPacket;
-import 
org.apache.shardingsphere.database.protocol.postgresql.packet.handshake.authentication.PostgreSQLMD5PasswordAuthenticationPacket;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.handshake.authentication.PostgreSQLPasswordAuthenticationPacket;
 import 
org.apache.shardingsphere.database.protocol.postgresql.payload.PostgreSQLPacketPayload;
 import 
org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import 
org.apache.shardingsphere.proxy.frontend.postgresql.authentication.authenticator.impl.PostgreSQLMD5PasswordAuthenticator;
+import 
org.apache.shardingsphere.proxy.frontend.connection.ConnectionIdGenerator;
 import org.apache.shardingsphere.proxy.frontend.ssl.ProxySSLContext;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
@@ -60,7 +73,10 @@ import org.mockito.Mock;
 import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
 import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -78,9 +94,13 @@ import static org.mockito.Mockito.when;
 @StaticMockSettings({ProxyContext.class, ProxySSLContext.class})
 class PostgreSQLAuthenticationEngineTest {
     
-    private final String username = "root";
+    private static final String USERNAME = "root";
     
-    private final String password = "sharding";
+    private static final String PASSWORD = "sharding";
+    
+    private static final String DATABASE_NAME = "sharding_db";
+    
+    private final PostgreSQLAuthenticationEngine authenticationEngine = new 
PostgreSQLAuthenticationEngine();
     
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ChannelHandlerContext channelHandlerContext;
@@ -91,14 +111,20 @@ class PostgreSQLAuthenticationEngineTest {
         
when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY)).thenReturn(mock(Attribute.class));
     }
     
+    @SneakyThrows(ReflectiveOperationException.class)
+    @Test
+    void assertHandshakeAssignsNextConnectionId() {
+        
Plugins.getMemberAccessor().set(ConnectionIdGenerator.class.getDeclaredField("currentId"),
 ConnectionIdGenerator.getInstance(), 0);
+        assertThat(authenticationEngine.handshake(channelHandlerContext), 
is(1));
+    }
+    
     @Test
     void assertSSLUnwilling() {
         ByteBuf byteBuf = createByteBuf(8, 8);
         byteBuf.writeInt(8);
         byteBuf.writeInt(80877103);
-        PacketPayload payload = new PostgreSQLPacketPayload(byteBuf, 
StandardCharsets.UTF_8);
         ChannelHandlerContext context = mock(ChannelHandlerContext.class);
-        AuthenticationResult actual = new 
PostgreSQLAuthenticationEngine().authenticate(context, payload);
+        AuthenticationResult actual = 
authenticationEngine.authenticate(context, new PostgreSQLPacketPayload(byteBuf, 
StandardCharsets.UTF_8));
         verify(context).writeAndFlush(any(PostgreSQLSSLUnwillingPacket.class));
         assertFalse(actual.isFinished());
     }
@@ -108,15 +134,27 @@ class PostgreSQLAuthenticationEngineTest {
         ByteBuf byteBuf = createByteBuf(8, 8);
         byteBuf.writeInt(8);
         byteBuf.writeInt(80877103);
-        PacketPayload payload = new PostgreSQLPacketPayload(byteBuf, 
StandardCharsets.UTF_8);
         ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
         when(ProxySSLContext.getInstance().isSSLEnabled()).thenReturn(true);
-        AuthenticationResult actual = new 
PostgreSQLAuthenticationEngine().authenticate(context, payload);
+        AuthenticationResult actual = 
authenticationEngine.authenticate(context, new PostgreSQLPacketPayload(byteBuf, 
StandardCharsets.UTF_8));
         verify(context).writeAndFlush(any(PostgreSQLSSLWillingPacket.class));
         
verify(context.pipeline()).addFirst(eq(SslHandler.class.getSimpleName()), 
any(SslHandler.class));
         assertFalse(actual.isFinished());
     }
     
+    @Test
+    void assertSSLRequestCodeMismatchFallsBackToStartup() {
+        ByteBuf byteBuf = createByteBuf(8, 8);
+        byteBuf.writeInt(8);
+        byteBuf.writeInt(80877104);
+        ProxyContext proxyContext = mock(ProxyContext.class);
+        ContextManager contextManager = mock(ContextManager.class);
+        
when(contextManager.getMetaDataContexts()).thenReturn(createMetaDataContexts(mock(AuthorityRule.class),
 false, null));
+        when(ProxyContext.getInstance()).thenReturn(proxyContext);
+        when(proxyContext.getContextManager()).thenReturn(contextManager);
+        assertThrows(EmptyUsernameException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, new 
PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8)));
+    }
+    
     @Test
     void assertUserNotSet() {
         PostgreSQLPacketPayload payload = new 
PostgreSQLPacketPayload(createByteBuf(8, 512), StandardCharsets.UTF_8);
@@ -124,19 +162,34 @@ class PostgreSQLAuthenticationEngineTest {
         payload.writeInt4(196608);
         payload.writeStringNul("client_encoding");
         payload.writeStringNul("UTF8");
-        ContextManager contextManager = mockContextManager();
+        ContextManager contextManager = 
mockContextManager(createMetaDataContexts(mock(AuthorityRule.class), false, 
null));
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        assertThrows(EmptyUsernameException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, payload));
+    }
+    
+    @Test
+    void assertStartupUsesPasswordAuthenticator() {
+        UserConfiguration userConfig = new UserConfiguration(USERNAME, 
PASSWORD, "", PostgreSQLAuthenticationMethod.PASSWORD.getMethodName(), false);
+        Map<String, AlgorithmConfiguration> authenticators = 
Collections.singletonMap(
+                PostgreSQLAuthenticationMethod.PASSWORD.getMethodName(), new 
AlgorithmConfiguration(PostgreSQLAuthenticationMethod.PASSWORD.getMethodName(), 
new Properties()));
+        AuthorityRule authorityRule = createAuthorityRule(userConfig, 
authenticators, PostgreSQLAuthenticationMethod.PASSWORD.getMethodName());
+        MetaDataContexts metaDataContexts = 
createMetaDataContexts(authorityRule, false, null);
+        ContextManager contextManager = mockContextManager(metaDataContexts);
         
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
-        assertThrows(EmptyUsernameException.class, () -> new 
PostgreSQLAuthenticationEngine().authenticate(channelHandlerContext, payload));
+        AuthenticationResult actual = 
authenticationEngine.authenticate(channelHandlerContext, 
createStartupPayload(USERNAME, DATABASE_NAME));
+        ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
+        verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
+        assertThat(argumentCaptor.getValue().getClass(), is((Object) 
PostgreSQLPasswordAuthenticationPacket.class));
+        assertFalse(actual.isFinished());
     }
     
     @Test
     void assertAuthenticateWithNonPasswordMessage() {
-        PostgreSQLAuthenticationEngine authenticationEngine = new 
PostgreSQLAuthenticationEngine();
         setAlreadyReceivedStartupMessage(authenticationEngine);
         PostgreSQLPacketPayload payload = new 
PostgreSQLPacketPayload(createByteBuf(8, 16), StandardCharsets.UTF_8);
         payload.writeInt1('F');
         payload.writeInt8(0L);
-        ContextManager contextManager = mockContextManager();
+        ContextManager contextManager = 
mockContextManager(createMetaDataContexts(mock(AuthorityRule.class), false, 
null));
         
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
         assertThrows(ProtocolViolationException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, payload));
     }
@@ -147,71 +200,142 @@ class PostgreSQLAuthenticationEngineTest {
     }
     
     @Test
-    void assertLoginSuccessful() {
-        assertLogin(password);
+    void assertLoginFailed() {
+        AuthorityRule authorityRule = createAuthorityRule(new 
UserConfiguration(USERNAME, PASSWORD, "", null, false), Collections.emptyMap(), 
null);
+        MetaDataContexts metaDataContexts = 
createMetaDataContexts(authorityRule, true, DATABASE_NAME);
+        ContextManager contextManager = mockContextManager(metaDataContexts);
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        authenticationEngine.authenticate(channelHandlerContext, 
createStartupPayload(USERNAME, DATABASE_NAME));
+        byte[] md5Salt = getMd5Salt(authenticationEngine);
+        PostgreSQLPacketPayload passwordPayload = 
createPasswordMessage(createMd5Digest(USERNAME, "wrong" + PASSWORD, md5Salt));
+        assertThrows(InvalidPasswordException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, passwordPayload));
     }
     
     @Test
-    void assertLoginFailed() {
-        assertThrows(InvalidPasswordException.class, () -> assertLogin("wrong" 
+ password));
+    void assertLoginWithUnknownDatabase() {
+        PostgreSQLPacketPayload payload = createStartupPayload(USERNAME, 
"missing_db");
+        AuthorityRule authorityRule = createAuthorityRule(new 
UserConfiguration(USERNAME, PASSWORD, "", null, false), Collections.emptyMap(), 
null);
+        MetaDataContexts metaDataContexts = 
createMetaDataContexts(authorityRule, false, null);
+        ContextManager contextManager = mockContextManager(metaDataContexts);
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        authenticationEngine.authenticate(channelHandlerContext, payload);
+        byte[] md5Salt = getMd5Salt(authenticationEngine);
+        PostgreSQLPacketPayload passwordPayload = 
createPasswordMessage(createMd5Digest(USERNAME, PASSWORD, md5Salt));
+        assertThrows(UnknownDatabaseException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, passwordPayload));
     }
     
-    @SneakyThrows(ReflectiveOperationException.class)
-    private void assertLogin(final String inputPassword) {
-        PostgreSQLPacketPayload payload = new 
PostgreSQLPacketPayload(createByteBuf(16, 128), StandardCharsets.UTF_8);
+    @Test
+    void assertLoginWithUnknownUsername() {
+        PostgreSQLPacketPayload payload = createStartupPayload(USERNAME, null);
+        AuthorityRule authorityRule = mock(AuthorityRule.class);
+        
when(authorityRule.findUser(any(Grantee.class))).thenReturn(Optional.empty());
+        MetaDataContexts metaDataContexts = 
createMetaDataContexts(authorityRule, true, DATABASE_NAME);
+        ContextManager contextManager = mockContextManager(metaDataContexts);
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        authenticationEngine.authenticate(channelHandlerContext, payload);
+        byte[] md5Salt = getMd5Salt(authenticationEngine);
+        PostgreSQLPacketPayload passwordPayload = 
createPasswordMessage(createMd5Digest(USERNAME, PASSWORD, md5Salt));
+        assertThrows(UnknownUsernameException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, passwordPayload));
+    }
+    
+    @Test
+    void assertLoginWithoutPrivilege() {
+        AuthorityRule authorityRule = mock(AuthorityRule.class);
+        ShardingSphereUser user = new ShardingSphereUser(USERNAME, PASSWORD, 
"");
+        ShardingSpherePrivileges privileges = 
mock(ShardingSpherePrivileges.class);
+        when(privileges.hasPrivileges(DATABASE_NAME)).thenReturn(false);
+        
when(authorityRule.findUser(any(Grantee.class))).thenReturn(Optional.of(user));
+        
when(authorityRule.findPrivileges(any(Grantee.class))).thenReturn(Optional.of(privileges));
+        
when(authorityRule.getAuthenticatorType(user)).thenReturn(PostgreSQLAuthenticationMethod.MD5.getMethodName());
+        MetaDataContexts metaDataContexts = 
createMetaDataContexts(authorityRule, true, DATABASE_NAME);
+        ContextManager contextManager = mockContextManager(metaDataContexts);
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        authenticationEngine.authenticate(channelHandlerContext, 
createStartupPayload(USERNAME, DATABASE_NAME));
+        byte[] md5Salt = getMd5Salt(authenticationEngine);
+        PostgreSQLPacketPayload passwordPayload = 
createPasswordMessage(createMd5Digest(USERNAME, PASSWORD, md5Salt));
+        assertThrows(PrivilegeNotGrantedException.class, () -> 
authenticationEngine.authenticate(channelHandlerContext, passwordPayload));
+    }
+    
+    @Test
+    void assertLoginWithNullDatabase() {
+        PostgreSQLPacketPayload payload = createStartupPayload(USERNAME, null);
+        AuthorityRule authorityRule = createAuthorityRule(new 
UserConfiguration(USERNAME, PASSWORD, "", null, false), Collections.emptyMap(), 
null);
+        MetaDataContexts metaDataContexts = 
createMetaDataContexts(authorityRule, false, null);
+        ContextManager contextManager = mockContextManager(metaDataContexts);
+        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        authenticationEngine.authenticate(channelHandlerContext, payload);
+        byte[] md5Salt = getMd5Salt(authenticationEngine);
+        PostgreSQLPacketPayload passwordPayload = 
createPasswordMessage(createMd5Digest(USERNAME, PASSWORD, md5Salt));
+        AuthenticationResult actual = 
authenticationEngine.authenticate(channelHandlerContext, passwordPayload);
+        
verify(channelHandlerContext).write(any(PostgreSQLAuthenticationOKPacket.class));
+        
verify(channelHandlerContext).writeAndFlush(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
+        assertThat(actual.isFinished(), is(true));
+    }
+    
+    private ByteBuf createByteBuf(final int initialCapacity, final int 
maxCapacity) {
+        return new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 
initialCapacity, maxCapacity);
+    }
+    
+    private PostgreSQLPacketPayload createStartupPayload(final String 
username, final String databaseName) {
+        PostgreSQLPacketPayload payload = new 
PostgreSQLPacketPayload(createByteBuf(32, 256), StandardCharsets.UTF_8);
         payload.writeInt4(64);
         payload.writeInt4(196608);
         payload.writeStringNul("user");
         payload.writeStringNul(username);
+        if (null != databaseName) {
+            payload.writeStringNul("database");
+            payload.writeStringNul(databaseName);
+        }
         payload.writeStringNul("client_encoding");
         payload.writeStringNul("UTF8");
-        PostgreSQLAuthenticationEngine engine = new 
PostgreSQLAuthenticationEngine();
-        ContextManager contextManager = mockContextManager();
-        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
-        AuthenticationResult actual = 
engine.authenticate(channelHandlerContext, payload);
-        assertFalse(actual.isFinished());
-        assertThat(actual.getUsername(), is(username));
-        ArgumentCaptor<PostgreSQLMD5PasswordAuthenticationPacket> 
argumentCaptor = 
ArgumentCaptor.forClass(PostgreSQLMD5PasswordAuthenticationPacket.class);
-        verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
-        PostgreSQLMD5PasswordAuthenticationPacket md5PasswordPacket = 
argumentCaptor.getValue();
-        byte[] md5Salt = getMd5Salt(md5PasswordPacket);
-        payload = new PostgreSQLPacketPayload(createByteBuf(16, 128), 
StandardCharsets.UTF_8);
-        String md5Digest = (String) 
Plugins.getMemberAccessor().invoke(PostgreSQLMD5PasswordAuthenticator.class.getDeclaredMethod("md5Encode",
 String.class, String.class, byte[].class),
-                new PostgreSQLMD5PasswordAuthenticator(), username, 
inputPassword, md5Salt);
-        payload.writeInt1('p');
-        payload.writeInt4(4 + md5Digest.length() + 1);
-        payload.writeStringNul(md5Digest);
-        MetaDataContexts metaDataContexts = getMetaDataContexts(new 
UserConfiguration(username, password, "", null, false));
-        
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
-        actual = engine.authenticate(channelHandlerContext, payload);
-        assertThat(actual.isFinished(), is(password.equals(inputPassword)));
+        return payload;
     }
     
-    private ByteBuf createByteBuf(final int initialCapacity, final int 
maxCapacity) {
-        return new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 
initialCapacity, maxCapacity);
+    private PostgreSQLPacketPayload createPasswordMessage(final String digest) 
{
+        PostgreSQLPacketPayload payload = new 
PostgreSQLPacketPayload(createByteBuf(16, 128), StandardCharsets.UTF_8);
+        payload.writeInt1('p');
+        payload.writeInt4(4 + digest.length() + 1);
+        payload.writeStringNul(digest);
+        return payload;
     }
     
-    private ContextManager mockContextManager() {
-        ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
-        
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new
 RuleMetaData(Collections.singleton(mock(AuthorityRule.class))));
+    private ContextManager mockContextManager(final MetaDataContexts 
metaDataContexts) {
+        ContextManager result = mock(ContextManager.class);
+        when(result.getMetaDataContexts()).thenReturn(metaDataContexts);
         return result;
     }
     
-    private MetaDataContexts getMetaDataContexts(final UserConfiguration 
userConfig) {
-        ShardingSphereMetaData metaData = new ShardingSphereMetaData(
-                Collections.emptyList(), mock(ResourceMetaData.class), 
buildGlobalRuleMetaData(userConfig), new ConfigurationProperties(new 
Properties()));
+    private MetaDataContexts createMetaDataContexts(final AuthorityRule 
authorityRule, final boolean containsDatabase, final String databaseName) {
+        RuleMetaData ruleMetaData = new 
RuleMetaData(Collections.singleton(authorityRule));
+        ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(containsDatabase && null != databaseName ? 
Collections.singleton(createDatabase(databaseName)) : Collections.emptyList(),
+                mock(ResourceMetaData.class), ruleMetaData, new 
ConfigurationProperties(new Properties()));
         return new MetaDataContexts(metaData, new ShardingSphereStatistics());
     }
     
-    private RuleMetaData buildGlobalRuleMetaData(final UserConfiguration 
userConfig) {
+    private ShardingSphereDatabase createDatabase(final String databaseName) {
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        when(database.getName()).thenReturn(databaseName);
+        
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class,
 "PostgreSQL"));
+        when(database.getRuleMetaData()).thenReturn(new 
RuleMetaData(Collections.emptyList()));
+        return database;
+    }
+    
+    private AuthorityRule createAuthorityRule(final UserConfiguration 
userConfig, final Map<String, AlgorithmConfiguration> authenticators, final 
String defaultAuthenticator) {
         AuthorityRuleConfiguration ruleConfig = new AuthorityRuleConfiguration(
-                Collections.singleton(userConfig), new 
AlgorithmConfiguration("ALL_PERMITTED", new Properties()), 
Collections.emptyMap(), null);
-        AuthorityRule rule = new AuthorityRuleBuilder().build(ruleConfig, 
Collections.emptyList(), mock(ConfigurationProperties.class));
-        return new RuleMetaData(Collections.singleton(rule));
+                Collections.singleton(userConfig), new 
AlgorithmConfiguration("ALL_PERMITTED", new Properties()), authenticators, 
defaultAuthenticator);
+        return new AuthorityRuleBuilder().build(ruleConfig, 
Collections.emptyList(), mock(ConfigurationProperties.class));
+    }
+    
+    private String createMd5Digest(final String username, final String 
password, final byte[] md5Salt) {
+        String passwordHash = new 
String(Hex.encodeHex(DigestUtils.md5(password + username), true));
+        MessageDigest messageDigest = DigestUtils.getMd5Digest();
+        messageDigest.update(passwordHash.getBytes());
+        messageDigest.update(md5Salt);
+        return "md5" + new String(Hex.encodeHex(messageDigest.digest(), true));
     }
     
     @SneakyThrows(ReflectiveOperationException.class)
-    private byte[] getMd5Salt(final PostgreSQLMD5PasswordAuthenticationPacket 
md5PasswordPacket) {
-        return (byte[]) 
Plugins.getMemberAccessor().get(PostgreSQLMD5PasswordAuthenticationPacket.class.getDeclaredField("md5Salt"),
 md5PasswordPacket);
+    private byte[] getMd5Salt(final PostgreSQLAuthenticationEngine target) {
+        return (byte[]) 
Plugins.getMemberAccessor().get(PostgreSQLAuthenticationEngine.class.getDeclaredField("md5Salt"),
 target);
     }
 }
diff --git 
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
 
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
index 754764e0dbf..997317ca525 100644
--- 
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
+++ 
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
@@ -17,125 +17,249 @@
 
 package org.apache.shardingsphere.proxy.frontend.postgresql.command;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
+import 
org.apache.shardingsphere.database.protocol.postgresql.constant.PostgreSQLMessageSeverityLevel;
 import 
org.apache.shardingsphere.database.protocol.postgresql.packet.PostgreSQLPacket;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.command.PostgreSQLCommandPacketFactory;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.command.query.PostgreSQLDataRowPacket;
 import 
org.apache.shardingsphere.database.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
+import 
org.apache.shardingsphere.database.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
 import 
org.apache.shardingsphere.database.protocol.postgresql.packet.generic.PostgreSQLReadyForQueryPacket;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import 
org.apache.shardingsphere.database.protocol.postgresql.payload.PostgreSQLPacketPayload;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
-import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
 import 
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
 import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
+import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import 
org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
-import 
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.simple.PostgreSQLComQueryExecutor;
+import 
org.apache.shardingsphere.proxy.frontend.postgresql.err.PostgreSQLErrorPacketFactory;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.plugins.MemberAccessor;
 import org.mockito.quality.Strictness;
 
+import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Properties;
+import java.util.Optional;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(AutoMockExtension.class)
-@StaticMockSettings(ProxyContext.class)
+@StaticMockSettings({
+        ProxyContext.class,
+        PostgreSQLCommandPacketFactory.class,
+        PostgreSQLCommandExecutorFactory.class,
+        PostgreSQLErrorPacketFactory.class,
+        PostgreSQLPortalContextRegistry.class
+})
 @MockitoSettings(strictness = Strictness.LENIENT)
 class PostgreSQLCommandExecuteEngineTest {
     
-    @Mock
+    private final PostgreSQLCommandExecuteEngine commandExecuteEngine = new 
PostgreSQLCommandExecuteEngine();
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ChannelHandlerContext channelHandlerContext;
     
     @Mock
     private Channel channel;
     
     @Mock
-    private PostgreSQLComQueryExecutor queryCommandExecutor;
+    private QueryCommandExecutor queryCommandExecutor;
+    
+    @Mock
+    private ProxyDatabaseConnectionManager databaseConnectionManager;
     
     @Mock
     private ConnectionSession connectionSession;
     
+    @Mock
+    private ConnectionResourceLock connectionResourceLock;
+    
+    private TransactionStatus transactionStatus;
+    
     @BeforeEach
     void setUp() {
         when(channelHandlerContext.channel()).thenReturn(channel);
-        when(connectionSession.getTransactionStatus()).thenReturn(new 
TransactionStatus());
+        transactionStatus = new TransactionStatus();
+        
when(connectionSession.getTransactionStatus()).thenReturn(transactionStatus);
+        
when(databaseConnectionManager.getConnectionSession()).thenReturn(connectionSession);
+        
when(databaseConnectionManager.getConnectionResourceLock()).thenReturn(connectionResourceLock);
     }
     
     @Test
-    void assertSimpleQueryWithUpdateResponseWriteQueryData() throws 
SQLException {
-        PostgreSQLComQueryExecutor comQueryExecutor = 
mock(PostgreSQLComQueryExecutor.class);
-        
when(comQueryExecutor.getResponseType()).thenReturn(ResponseType.UPDATE);
-        PostgreSQLCommandExecuteEngine commandExecuteEngine = new 
PostgreSQLCommandExecuteEngine();
-        ProxyDatabaseConnectionManager databaseConnectionManager = 
mock(ProxyDatabaseConnectionManager.class);
-        
when(databaseConnectionManager.getConnectionSession()).thenReturn(connectionSession);
-        commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, comQueryExecutor, 0);
-        
verify(channelHandlerContext).write(any(PostgreSQLReadyForQueryPacket.class));
+    void assertGetCommandPacketType() {
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeByte(PostgreSQLCommandPacketType.SIMPLE_QUERY.getValue());
+        PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, 
StandardCharsets.UTF_8);
+        assertThat(commandExecuteEngine.getCommandPacketType(payload), 
is(PostgreSQLCommandPacketType.SIMPLE_QUERY));
+    }
+    
+    @Test
+    void assertGetCommandPacket() {
+        PostgreSQLPacketPayload payload = mock(PostgreSQLPacketPayload.class);
+        PostgreSQLCommandPacket expectedPacket = 
mock(PostgreSQLCommandPacket.class);
+        
when(PostgreSQLCommandPacketFactory.newInstance(PostgreSQLCommandPacketType.SIMPLE_QUERY,
 payload)).thenReturn(expectedPacket);
+        assertThat(commandExecuteEngine.getCommandPacket(payload, 
PostgreSQLCommandPacketType.SIMPLE_QUERY, connectionSession), 
is(expectedPacket));
+    }
+    
+    @Test
+    void assertGetCommandExecutor() throws SQLException {
+        when(connectionSession.getConnectionId()).thenReturn(1);
+        PostgreSQLPortalContextRegistry registry = 
mock(PostgreSQLPortalContextRegistry.class);
+        
when(PostgreSQLPortalContextRegistry.getInstance()).thenReturn(registry);
+        PortalContext portalContext = mock(PortalContext.class);
+        when(registry.get(1)).thenReturn(portalContext);
+        PostgreSQLCommandPacket commandPacket = 
mock(PostgreSQLCommandPacket.class);
+        CommandExecutor expectedExecutor = mock(CommandExecutor.class);
+        
when(PostgreSQLCommandExecutorFactory.newInstance(PostgreSQLCommandPacketType.SIMPLE_QUERY,
 commandPacket, connectionSession, portalContext)).thenReturn(expectedExecutor);
+        
assertThat(commandExecuteEngine.getCommandExecutor(PostgreSQLCommandPacketType.SIMPLE_QUERY,
 commandPacket, connectionSession), is(expectedExecutor));
+    }
+    
+    @Test
+    void assertGetErrorPacket() {
+        Exception cause = new Exception("error");
+        PostgreSQLErrorResponsePacket expectedPacket = 
PostgreSQLErrorResponsePacket.newBuilder(PostgreSQLMessageSeverityLevel.ERROR, 
"state", "message").build();
+        
when(PostgreSQLErrorPacketFactory.newInstance(cause)).thenReturn(expectedPacket);
+        assertThat(commandExecuteEngine.getErrorPacket(cause), 
is(expectedPacket));
+    }
+    
+    @Test
+    void assertGetOtherPacketWhenInTransaction() {
+        transactionStatus.setInTransaction(true);
+        Optional<DatabasePacket> actual = 
commandExecuteEngine.getOtherPacket(connectionSession);
+        assertThat(actual.isPresent(), is(true));
+        assertThat(actual.get(), 
is(PostgreSQLReadyForQueryPacket.TRANSACTION_FAILED));
+    }
+    
+    @Test
+    void assertGetOtherPacketWhenNotInTransaction() {
+        Optional<DatabasePacket> actual = 
commandExecuteEngine.getOtherPacket(connectionSession);
+        assertThat(actual.isPresent(), is(true));
+        assertThat(actual.get(), 
is(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION));
+    }
+    
+    @Test
+    void assertWriteQueryDataWhenChannelInactiveForQuery() throws SQLException 
{
+        
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.QUERY);
+        commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, queryCommandExecutor, 0);
+        verify(queryCommandExecutor, never()).next();
+        
verify(channelHandlerContext).write(isA(PostgreSQLCommandCompletePacket.class));
     }
     
     @Test
-    void assertWriteQueryDataWithUpdate() throws SQLException {
-        PostgreSQLCommandExecuteEngine commandExecuteEngine = new 
PostgreSQLCommandExecuteEngine();
+    void assertWriteQueryDataForUpdateWhenInTransaction() throws SQLException {
+        transactionStatus.setInTransaction(true);
         
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.UPDATE);
-        ProxyDatabaseConnectionManager databaseConnectionManager = 
mock(ProxyDatabaseConnectionManager.class, RETURNS_DEEP_STUBS);
-        
when(databaseConnectionManager.getConnectionSession()).thenReturn(connectionSession);
         commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, queryCommandExecutor, 0);
-        
verify(channelHandlerContext).write(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
+        
verify(channelHandlerContext).write(PostgreSQLReadyForQueryPacket.IN_TRANSACTION);
     }
     
     @Test
-    void assertWriteQueryDataWithInactiveChannel() throws SQLException {
-        PostgreSQLCommandExecuteEngine commandExecuteEngine = new 
PostgreSQLCommandExecuteEngine();
+    void assertWriteQueryDataForQueryFlushAndCountDataRows() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.QUERY);
-        when(channel.isActive()).thenReturn(false);
-        commandExecuteEngine.writeQueryData(channelHandlerContext, 
mock(ProxyDatabaseConnectionManager.class), queryCommandExecutor, 0);
-        
verify(channelHandlerContext).write(isA(PostgreSQLCommandCompletePacket.class));
+        when(channel.isActive()).thenReturn(true);
+        when(queryCommandExecutor.next()).thenReturn(true, false);
+        PostgreSQLDataRowPacket dataRowPacket = 
mock(PostgreSQLDataRowPacket.class);
+        
when(queryCommandExecutor.getQueryRowPacket()).thenReturn(dataRowPacket);
+        mockProxyContextFlushThreshold(1);
+        commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, queryCommandExecutor, 0);
+        verify(connectionResourceLock).doAwait(channelHandlerContext);
+        verify(channelHandlerContext).write(dataRowPacket);
+        verify(channelHandlerContext).flush();
+        PostgreSQLCommandCompletePacket commandCompletePacket = 
captureCommandCompletePacket(3);
+        assertThat(getCommandCompleteRowCount(commandCompletePacket), is(1L));
+        
verify(channelHandlerContext).write(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
     }
     
     @Test
-    void assertWriteQueryDataWithHasNextResult() throws SQLException {
-        PostgreSQLComQueryExecutor queryCommandExecutor = 
mock(PostgreSQLComQueryExecutor.class);
+    void assertWriteQueryDataForQueryWithoutFlushAndNonDataRow() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.QUERY);
         when(channel.isActive()).thenReturn(true);
         when(queryCommandExecutor.next()).thenReturn(true, false);
-        when(channel.isWritable()).thenReturn(false, true);
-        ConnectionResourceLock connectionResourceLock = 
mock(ConnectionResourceLock.class);
-        ProxyDatabaseConnectionManager databaseConnectionManager = 
mock(ProxyDatabaseConnectionManager.class);
-        
when(databaseConnectionManager.getConnectionResourceLock()).thenReturn(connectionResourceLock);
-        
when(databaseConnectionManager.getConnectionSession()).thenReturn(connectionSession);
-        PostgreSQLPacket packet = mock(PostgreSQLPacket.class);
-        when(queryCommandExecutor.getQueryRowPacket()).thenReturn(packet);
-        PostgreSQLCommandExecuteEngine commandExecuteEngine = new 
PostgreSQLCommandExecuteEngine();
-        ComputeNodeInstanceContext computeNodeInstanceContext = 
mock(ComputeNodeInstanceContext.class);
-        
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
-        ContextManager contextManager = new ContextManager(
-                new MetaDataContexts(new 
ShardingSphereMetaData(Collections.emptyList(), new 
ResourceMetaData(Collections.emptyMap()),
-                        new RuleMetaData(Collections.emptyList()), new 
ConfigurationProperties(new Properties())), new ShardingSphereStatistics()),
-                computeNodeInstanceContext, mock(), mock());
-        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        PostgreSQLPacket rowPacket = mock(PostgreSQLPacket.class);
+        when(queryCommandExecutor.getQueryRowPacket()).thenReturn(rowPacket);
+        mockProxyContextFlushThreshold(2);
         commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, queryCommandExecutor, 0);
         verify(connectionResourceLock).doAwait(channelHandlerContext);
-        verify(channelHandlerContext).write(packet);
-        
verify(channelHandlerContext).write(isA(PostgreSQLCommandCompletePacket.class));
-        
verify(channelHandlerContext).write(isA(PostgreSQLReadyForQueryPacket.class));
+        verify(channelHandlerContext).write(rowPacket);
+        verify(channelHandlerContext, never()).flush();
+        PostgreSQLCommandCompletePacket commandCompletePacket = 
captureCommandCompletePacket(3);
+        assertThat(getCommandCompleteRowCount(commandCompletePacket), is(0L));
+        
verify(channelHandlerContext).write(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
+    }
+    
+    @Test
+    void assertWriteQueryDataForQueryWithoutRows() throws SQLException, 
NoSuchFieldException, IllegalAccessException {
+        
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.QUERY);
+        when(channel.isActive()).thenReturn(true);
+        when(queryCommandExecutor.next()).thenReturn(false);
+        mockProxyContextFlushThreshold(2);
+        commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, queryCommandExecutor, 0);
+        verify(connectionResourceLock, never()).doAwait(any());
+        verify(channelHandlerContext, 
never()).write(isA(PostgreSQLDataRowPacket.class));
+        PostgreSQLCommandCompletePacket commandCompletePacket = 
captureCommandCompletePacket(2);
+        assertThat(getCommandCompleteRowCount(commandCompletePacket), is(0L));
+        
verify(channelHandlerContext).write(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
+    }
+    
+    @Test
+    void assertWriteQueryDataForQueryInTransaction() throws SQLException, 
NoSuchFieldException, IllegalAccessException {
+        transactionStatus.setInTransaction(true);
+        
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.QUERY);
+        when(channel.isActive()).thenReturn(true);
+        when(queryCommandExecutor.next()).thenReturn(false);
+        mockProxyContextFlushThreshold(2);
+        commandExecuteEngine.writeQueryData(channelHandlerContext, 
databaseConnectionManager, queryCommandExecutor, 0);
+        PostgreSQLCommandCompletePacket commandCompletePacket = 
captureCommandCompletePacket(2);
+        assertThat(getCommandCompleteRowCount(commandCompletePacket), is(0L));
+        
verify(channelHandlerContext).write(PostgreSQLReadyForQueryPacket.IN_TRANSACTION);
+    }
+    
+    private void mockProxyContextFlushThreshold(final int threshold) {
+        ProxyContext proxyContext = mock(ProxyContext.class, 
RETURNS_DEEP_STUBS);
+        
when(proxyContext.getContextManager().getMetaDataContexts().getMetaData().getProps())
+                .thenReturn(new 
ConfigurationProperties(PropertiesBuilder.build(new 
Property(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD.getKey(), 
threshold))));
+        when(ProxyContext.getInstance()).thenReturn(proxyContext);
+    }
+    
+    private PostgreSQLCommandCompletePacket captureCommandCompletePacket(final 
int expectedWrites) {
+        ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+        verify(channelHandlerContext, 
times(expectedWrites)).write(captor.capture());
+        return (PostgreSQLCommandCompletePacket) 
captor.getAllValues().stream().filter(each -> each instanceof 
PostgreSQLCommandCompletePacket).findFirst()
+                .orElseThrow(() -> new AssertionError("CommandComplete packet 
not written"));
+    }
+    
+    private long getCommandCompleteRowCount(final 
PostgreSQLCommandCompletePacket packet) throws NoSuchFieldException, 
IllegalAccessException {
+        MemberAccessor accessor = Plugins.getMemberAccessor();
+        return (long) 
accessor.get(PostgreSQLCommandCompletePacket.class.getDeclaredField("rowCount"),
 packet);
     }
 }


Reply via email to