This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e02fcd  Refactor ChannelThreadExecutorGroup to 
ConnectionThreadExecutorGroup (#10260)
3e02fcd is described below

commit 3e02fcd28a103e356321026cf4b70aec0f9677a7
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed May 12 21:45:26 2021 +0800

    Refactor ChannelThreadExecutorGroup to ConnectionThreadExecutorGroup 
(#10260)
---
 .../jdbc/connection/BackendConnection.java         |  3 -
 .../frontend/command/CommandExecutorTask.java      |  1 -
 .../executor/ChannelThreadExecutorGroup.java       | 80 --------------------
 .../frontend/executor/CommandExecutorSelector.java | 13 +---
 .../executor/ConnectionThreadExecutorGroup.java    | 86 ++++++++++++++++++++++
 .../netty/FrontendChannelInboundHandler.java       | 15 ++--
 .../proxy/frontend/state/impl/OKProxyState.java    | 10 +--
 .../executor/CommandExecutorSelectorTest.java      | 24 +++---
 ...java => ConnectionThreadExecutorGroupTest.java} | 20 +++--
 .../postgresql/PostgreSQLFrontendEngine.java       | 11 ---
 .../postgresql/PostgreSQLFrontendEngineTest.java   |  3 +
 11 files changed, 123 insertions(+), 143 deletions(-)

diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 2d983e5..50a5bc3 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -50,7 +50,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Backend connection.
@@ -73,8 +72,6 @@ public final class BackendConnection implements 
ExecutorJDBCManager {
     @Setter
     private volatile CalciteExecutor calciteExecutor;
     
-    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
-    
     private final Multimap<String, Connection> cachedConnections = 
LinkedHashMultimap.create();
     
     private final Collection<Statement> cachedStatements = new 
CopyOnWriteArrayList<>();
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index a8c118b..0323a9f 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -74,7 +74,6 @@ public final class CommandExecutorTask implements Runnable {
             // CHECKSTYLE:ON
             processException(ex);
         } finally {
-            backendConnection.getSubmittedTaskCount().decrementAndGet();
             Collection<SQLException> exceptions = closeExecutionResources();
             if (isNeedFlush) {
                 context.flush();
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroup.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroup.java
deleted file mode 100644
index ddb5487..0000000
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroup.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.frontend.executor;
-
-import io.netty.channel.ChannelId;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Channel thread executor group.
- * 
- * <p>
- *     Manage the thread for each channel invoking.
- *     This ensure XA transaction framework processed by current thread id.
- * </p>
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ChannelThreadExecutorGroup {
-    
-    private static final ChannelThreadExecutorGroup INSTANCE = new 
ChannelThreadExecutorGroup();
-    
-    private final Map<ChannelId, ExecutorService> executorServices = new 
ConcurrentHashMap<>();
-    
-    /**
-     * Get channel thread executor group.
-     * 
-     * @return channel thread executor group
-     */
-    public static ChannelThreadExecutorGroup getInstance() {
-        return INSTANCE;
-    }
-    
-    /**
-     * Register channel.
-     *
-     * @param channelId channel id
-     */
-    public void register(final ChannelId channelId) {
-        executorServices.put(channelId, Executors.newSingleThreadExecutor());
-    }
-    
-    /**
-     * Get executor service of current channel.
-     *
-     * @param channelId channel id
-     * @return executor service of current channel
-     */
-    public ExecutorService get(final ChannelId channelId) {
-        return executorServices.get(channelId);
-    }
-    
-    /**
-     * Unregister channel.
-     *
-     * @param channelId channel id
-     */
-    public void unregister(final ChannelId channelId) {
-        executorServices.remove(channelId).shutdown();
-    }
-}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
index bb3d81b..c199a8f 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.proxy.frontend.executor;
 
-import io.netty.channel.ChannelId;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -36,15 +35,11 @@ public final class CommandExecutorSelector {
      * @param isOccupyThreadForPerConnection is occupy thread for per 
connection or not
      * @param supportHint is support hint
      * @param transactionType transaction type
-     * @param channelId channel ID
+     * @param connectionId connection ID
      * @return executor service
      */
-    public static ExecutorService getExecutorService(final boolean 
isOccupyThreadForPerConnection, final boolean supportHint, final 
TransactionType transactionType, final ChannelId channelId) {
-        return isOccupyThreadForPerConnection(isOccupyThreadForPerConnection, 
supportHint, transactionType)
-                ? ChannelThreadExecutorGroup.getInstance().get(channelId) : 
UserExecutorGroup.getInstance().getExecutorService();
-    }
-    
-    private static boolean isOccupyThreadForPerConnection(final boolean 
isOccupyThreadForPerConnection, final boolean supportHint, final 
TransactionType transactionType) {
-        return isOccupyThreadForPerConnection || supportHint || 
TransactionType.isDistributedTransaction(transactionType);
+    public static ExecutorService getExecutorService(final boolean 
isOccupyThreadForPerConnection, final boolean supportHint, final 
TransactionType transactionType, final int connectionId) {
+        return isOccupyThreadForPerConnection || supportHint || 
TransactionType.isDistributedTransaction(transactionType)
+                ? 
ConnectionThreadExecutorGroup.getInstance().get(connectionId) : 
UserExecutorGroup.getInstance().getExecutorService();
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
new file mode 100644
index 0000000..f1e325b
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.executor;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Connection thread executor group.
+ *
+ * <p>
+ * Manage the thread for each backend connection invoking.
+ * This ensure XA transaction framework processed by current thread id.
+ * </p>
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ConnectionThreadExecutorGroup {
+    
+    private static final ConnectionThreadExecutorGroup INSTANCE = new 
ConnectionThreadExecutorGroup();
+    
+    private final Map<Integer, ExecutorService> executorServices = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Get connection thread executor group.
+     *
+     * @return connection thread executor group
+     */
+    public static ConnectionThreadExecutorGroup getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register connection.
+     *
+     * @param connectionId connection id
+     */
+    public void register(final int connectionId) {
+        executorServices.put(connectionId, 
Executors.newSingleThreadExecutor());
+    }
+    
+    /**
+     * Get executor service of connection.
+     *
+     * @param connectionId connection id
+     * @return executor service of current connection
+     */
+    public ExecutorService get(final int connectionId) {
+        return executorServices.get(connectionId);
+    }
+    
+    /**
+     * Unregister connection and await termination.
+     *
+     * @param connectionId connection id
+     */
+    public void unregisterAndAwaitTermination(final int connectionId) {
+        ExecutorService executorService = 
executorServices.remove(connectionId);
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException ignored) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 24e7201..1082eaf 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKe
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResult;
-import 
org.apache.shardingsphere.proxy.frontend.executor.ChannelThreadExecutorGroup;
+import 
org.apache.shardingsphere.proxy.frontend.executor.ConnectionThreadExecutorGroup;
 import 
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
 import 
org.apache.shardingsphere.readwritesplitting.route.engine.impl.PrimaryVisitedManager;
@@ -53,8 +53,9 @@ public final class FrontendChannelInboundHandler extends 
ChannelInboundHandlerAd
     
     @Override
     public void channelActive(final ChannelHandlerContext context) {
-        
ChannelThreadExecutorGroup.getInstance().register(context.channel().id());
-        
backendConnection.setConnectionId(databaseProtocolFrontendEngine.getAuthenticationEngine().handshake(context));
+        int connectionId = 
databaseProtocolFrontendEngine.getAuthenticationEngine().handshake(context);
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        backendConnection.setConnectionId(connectionId);
     }
     
     @Override
@@ -87,17 +88,17 @@ public final class FrontendChannelInboundHandler extends 
ChannelInboundHandlerAd
     @Override
     public void channelInactive(final ChannelHandlerContext context) {
         context.fireChannelInactive();
-        closeAllResources(context);
+        closeAllResources();
     }
     
-    private void closeAllResources(final ChannelHandlerContext context) {
-        databaseProtocolFrontendEngine.release(backendConnection);
+    private void closeAllResources() {
         PrimaryVisitedManager.clear();
         backendConnection.closeResultSets();
         backendConnection.closeStatements();
         backendConnection.closeConnections(true);
         backendConnection.closeCalciteExecutor();
-        
ChannelThreadExecutorGroup.getInstance().unregister(context.channel().id());
+        
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
+        databaseProtocolFrontendEngine.release(backendConnection);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
index 2cad095..925c6d0 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
 import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
 
 /**
  * OK proxy state.
@@ -39,12 +38,7 @@ public final class OKProxyState implements ProxyState {
         boolean supportHint = 
ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
         boolean isOccupyThreadForPerConnection = 
databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
         ExecutorService executorService = 
CommandExecutorSelector.getExecutorService(
-                isOccupyThreadForPerConnection, supportHint, 
backendConnection.getTransactionStatus().getTransactionType(), 
context.channel().id());
-        backendConnection.getSubmittedTaskCount().incrementAndGet();
-        try {
-            executorService.execute(new 
CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, 
message));
-        } catch (final RejectedExecutionException ignored) {
-            backendConnection.getSubmittedTaskCount().decrementAndGet();
-        }
+                isOccupyThreadForPerConnection, supportHint, 
backendConnection.getTransactionStatus().getTransactionType(), 
backendConnection.getConnectionId());
+        executorService.execute(new 
CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, 
message));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
index 1ca3693..a68a328 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.proxy.frontend.executor;
 
-import io.netty.channel.ChannelId;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.junit.Test;
 
@@ -25,34 +24,33 @@ import java.util.concurrent.ExecutorService;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
 
 public final class CommandExecutorSelectorTest {
     
     @Test
     public void assertGetExecutorServiceWithLocal() {
-        ChannelId channelId = mock(ChannelId.class);
-        assertThat(CommandExecutorSelector.getExecutorService(false, false, 
TransactionType.LOCAL, channelId), instanceOf(ExecutorService.class));
+        int connectionId = 1;
+        assertThat(CommandExecutorSelector.getExecutorService(false, false, 
TransactionType.LOCAL, connectionId), instanceOf(ExecutorService.class));
     }
     
     @Test
     public void assertGetExecutorServiceWithOccupyThreadForPerConnection() {
-        ChannelId channelId = mock(ChannelId.class);
-        ChannelThreadExecutorGroup.getInstance().register(channelId);
-        assertThat(CommandExecutorSelector.getExecutorService(true, false, 
TransactionType.LOCAL, channelId), instanceOf(ExecutorService.class));
+        int connectionId = 2;
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        assertThat(CommandExecutorSelector.getExecutorService(true, false, 
TransactionType.LOCAL, connectionId), instanceOf(ExecutorService.class));
     }
     
     @Test
     public void assertGetExecutorServiceWithXA() {
-        ChannelId channelId = mock(ChannelId.class);
-        ChannelThreadExecutorGroup.getInstance().register(channelId);
-        assertThat(CommandExecutorSelector.getExecutorService(false, false, 
TransactionType.XA, channelId), instanceOf(ExecutorService.class));
+        int connectionId = 3;
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        assertThat(CommandExecutorSelector.getExecutorService(false, false, 
TransactionType.XA, connectionId), instanceOf(ExecutorService.class));
     }
     
     @Test
     public void assertGetExecutorServiceWithBASE() {
-        ChannelId channelId = mock(ChannelId.class);
-        ChannelThreadExecutorGroup.getInstance().register(channelId);
-        assertThat(CommandExecutorSelector.getExecutorService(false, false, 
TransactionType.BASE, channelId), instanceOf(ExecutorService.class));
+        int connectionId = 4;
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        assertThat(CommandExecutorSelector.getExecutorService(false, false, 
TransactionType.BASE, connectionId), instanceOf(ExecutorService.class));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroupTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroupTest.java
similarity index 61%
rename from 
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroupTest.java
rename to 
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroupTest.java
index 6de8ce13..2cf9940 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroupTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroupTest.java
@@ -17,28 +17,26 @@
 
 package org.apache.shardingsphere.proxy.frontend.executor;
 
-import io.netty.channel.ChannelId;
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
 
-public final class ChannelThreadExecutorGroupTest {
+public final class ConnectionThreadExecutorGroupTest {
     
     @Test
     public void assertRegister() {
-        ChannelId channelId = mock(ChannelId.class);
-        ChannelThreadExecutorGroup.getInstance().register(channelId);
-        assertNotNull(ChannelThreadExecutorGroup.getInstance().get(channelId));
-        ChannelThreadExecutorGroup.getInstance().unregister(channelId);
+        int connectionId = 1;
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        
assertNotNull(ConnectionThreadExecutorGroup.getInstance().get(connectionId));
+        
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionId);
     }
     
     @Test
     public void assertUnregister() {
-        ChannelId channelId = mock(ChannelId.class);
-        ChannelThreadExecutorGroup.getInstance().register(channelId);
-        ChannelThreadExecutorGroup.getInstance().unregister(channelId);
-        assertNull(ChannelThreadExecutorGroup.getInstance().get(channelId));
+        int connectionId = 2;
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionId);
+        
assertNull(ConnectionThreadExecutorGroup.getInstance().get(connectionId));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
index 2f64869..46a9f81 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
@@ -46,20 +46,9 @@ public final class PostgreSQLFrontendEngine implements 
DatabaseProtocolFrontendE
     
     @Override
     public void release(final BackendConnection backendConnection) {
-        waitingForFinish(backendConnection);
         
PostgreSQLBinaryStatementRegistry.getInstance().unregister(backendConnection.getConnectionId());
     }
     
-    private void waitingForFinish(final BackendConnection backendConnection) {
-        while (backendConnection.getSubmittedTaskCount().get() > 0) {
-            try {
-                Thread.sleep(500L);
-            } catch (final InterruptedException ex) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
index 4a938c9..d1f3da0 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.frontend.postgresql;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import 
org.apache.shardingsphere.proxy.frontend.executor.ConnectionThreadExecutorGroup;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -47,6 +48,8 @@ public final class PostgreSQLFrontendEngineTest {
         registry.register(connectionId);
         assertNotNull(registry.get(connectionId));
         PostgreSQLFrontendEngine frontendEngine = new 
PostgreSQLFrontendEngine();
+        ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+        
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionId);
         frontendEngine.release(backendConnection);
         assertNull(registry.get(connectionId));
     }

Reply via email to