This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f22250a04 Init MySQL sequence ID when channel initializing (#23192)
d8f22250a04 is described below

commit d8f22250a04617eb3c9b19d9bf2fd134a35eac30
Author: 吴伟杰 <[email protected]>
AuthorDate: Fri Dec 30 15:38:20 2022 +0800

    Init MySQL sequence ID when channel initializing (#23192)
---
 .../frontend/executor/ConnectionThreadExecutorGroup.java      |  3 +++
 .../proxy/frontend/netty/ServerHandlerInitializer.java        |  1 +
 .../proxy/frontend/mysql/MySQLFrontendEngine.java             | 11 ++++++++++-
 .../mysql/authentication/MySQLAuthenticationEngine.java       |  6 ++----
 .../proxy/frontend/mysql/MySQLFrontendEngineTest.java         |  8 ++++++++
 .../proxy/frontend/spi/DatabaseProtocolFrontendEngine.java    |  9 +++++++++
 6 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
index 4731bc7355d..d89d7bdd7c5 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
@@ -81,6 +81,9 @@ public final class ConnectionThreadExecutorGroup {
      */
     public void unregisterAndAwaitTermination(final int connectionId) {
         ExecutorService executorService = 
executorServices.remove(connectionId);
+        if (null == executorService) {
+            return;
+        }
         executorService.shutdown();
         try {
             executorService.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
index c2fc6963999..358bed65e39 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
@@ -40,6 +40,7 @@ public final class ServerHandlerInitializer extends 
ChannelInitializer<SocketCha
     @Override
     protected void initChannel(final SocketChannel socketChannel) {
         DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = 
TypedSPIRegistry.getRegisteredService(DatabaseProtocolFrontendEngine.class, 
databaseType.getType(), new Properties());
+        databaseProtocolFrontendEngine.initChannel(socketChannel);
         ChannelPipeline pipeline = socketChannel.pipeline();
         pipeline.addLast(new ChannelAttrInitializer());
         pipeline.addLast(new 
PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
diff --git 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index 57fe1b0d083..ea9617241f5 100644
--- 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -17,12 +17,13 @@
 
 package org.apache.shardingsphere.proxy.frontend.mysql;
 
+import io.netty.channel.Channel;
 import lombok.Getter;
 import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import 
org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -31,8 +32,11 @@ import 
org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
 import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.authentication.MySQLAuthenticationEngine;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.MySQLCommandExecuteEngine;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import 
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Frontend engine for MySQL.
  */
@@ -52,6 +56,11 @@ public final class MySQLFrontendEngine implements 
DatabaseProtocolFrontendEngine
                 
.getContextManager().getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.PROXY_MYSQL_DEFAULT_VERSION));
     }
     
+    @Override
+    public void initChannel(final Channel channel) {
+        channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new 
AtomicInteger());
+    }
+    
     @Override
     public void setDatabaseVersion(final String databaseName, final String 
databaseVersion) {
         MySQLServerInfo.setServerVersion(databaseName, databaseVersion);
diff --git 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index 4d5b4a4c241..3db3cb9886f 100644
--- 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -24,9 +24,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCapabilityFlag;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
-import org.apache.shardingsphere.dialect.mysql.vendor.MySQLVendorError;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLStatusFlag;
-import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -35,17 +33,18 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandsha
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandshakeResponse41Packet;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
+import org.apache.shardingsphere.dialect.mysql.vendor.MySQLVendorError;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
 import 
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResult;
 import 
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResultBuilder;
 import 
org.apache.shardingsphere.proxy.frontend.connection.ConnectionIdGenerator;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.authentication.authenticator.MySQLAuthenticator;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Authentication engine for MySQL.
@@ -64,7 +63,6 @@ public final class MySQLAuthenticationEngine implements 
AuthenticationEngine {
     
     @Override
     public int handshake(final ChannelHandlerContext context) {
-        context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new 
AtomicInteger());
         int result = ConnectionIdGenerator.getInstance().nextId();
         connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
         context.writeAndFlush(new MySQLHandshakePacket(result, 
authenticationHandler.getAuthPluginData()));
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
index b3344b05f56..635c6d3ae78 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
@@ -60,12 +60,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -106,6 +108,12 @@ public final class MySQLFrontendEngineTest extends 
ProxyContextRestorer {
         mysqlFrontendEngine = new MySQLFrontendEngine();
     }
     
+    @Test
+    public void assertInitChannel() {
+        mysqlFrontendEngine.initChannel(channel);
+        
verify(channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID)).set(any(AtomicInteger.class));
+    }
+    
     @Test
     public void assertHandshake() {
         
assertTrue(mysqlFrontendEngine.getAuthenticationEngine().handshake(context) > 
0);
diff --git 
a/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
 
b/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
index 6fab06399a2..2c79559bd0c 100644
--- 
a/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
+++ 
b/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.proxy.frontend.spi;
 
+import io.netty.channel.Channel;
 import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
@@ -29,6 +30,14 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
  */
 public interface DatabaseProtocolFrontendEngine extends TypedSPI {
     
+    /**
+     * Initialize channel.
+     * 
+     * @param channel channel
+     */
+    default void initChannel(Channel channel) {
+    }
+    
     /**
      * Set database version.
      * 

Reply via email to