This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3e02fcd Refactor ChannelThreadExecutorGroup to
ConnectionThreadExecutorGroup (#10260)
3e02fcd is described below
commit 3e02fcd28a103e356321026cf4b70aec0f9677a7
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed May 12 21:45:26 2021 +0800
Refactor ChannelThreadExecutorGroup to ConnectionThreadExecutorGroup
(#10260)
---
.../jdbc/connection/BackendConnection.java | 3 -
.../frontend/command/CommandExecutorTask.java | 1 -
.../executor/ChannelThreadExecutorGroup.java | 80 --------------------
.../frontend/executor/CommandExecutorSelector.java | 13 +---
.../executor/ConnectionThreadExecutorGroup.java | 86 ++++++++++++++++++++++
.../netty/FrontendChannelInboundHandler.java | 15 ++--
.../proxy/frontend/state/impl/OKProxyState.java | 10 +--
.../executor/CommandExecutorSelectorTest.java | 24 +++---
...java => ConnectionThreadExecutorGroupTest.java} | 20 +++--
.../postgresql/PostgreSQLFrontendEngine.java | 11 ---
.../postgresql/PostgreSQLFrontendEngineTest.java | 3 +
11 files changed, 123 insertions(+), 143 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 2d983e5..50a5bc3 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -50,7 +50,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Backend connection.
@@ -73,8 +72,6 @@ public final class BackendConnection implements
ExecutorJDBCManager {
@Setter
private volatile CalciteExecutor calciteExecutor;
- private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
-
private final Multimap<String, Connection> cachedConnections =
LinkedHashMultimap.create();
private final Collection<Statement> cachedStatements = new
CopyOnWriteArrayList<>();
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index a8c118b..0323a9f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -74,7 +74,6 @@ public final class CommandExecutorTask implements Runnable {
// CHECKSTYLE:ON
processException(ex);
} finally {
- backendConnection.getSubmittedTaskCount().decrementAndGet();
Collection<SQLException> exceptions = closeExecutionResources();
if (isNeedFlush) {
context.flush();
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroup.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroup.java
deleted file mode 100644
index ddb5487..0000000
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroup.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.frontend.executor;
-
-import io.netty.channel.ChannelId;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Channel thread executor group.
- *
- * <p>
- * Manage the thread for each channel invoking.
- * This ensure XA transaction framework processed by current thread id.
- * </p>
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ChannelThreadExecutorGroup {
-
- private static final ChannelThreadExecutorGroup INSTANCE = new
ChannelThreadExecutorGroup();
-
- private final Map<ChannelId, ExecutorService> executorServices = new
ConcurrentHashMap<>();
-
- /**
- * Get channel thread executor group.
- *
- * @return channel thread executor group
- */
- public static ChannelThreadExecutorGroup getInstance() {
- return INSTANCE;
- }
-
- /**
- * Register channel.
- *
- * @param channelId channel id
- */
- public void register(final ChannelId channelId) {
- executorServices.put(channelId, Executors.newSingleThreadExecutor());
- }
-
- /**
- * Get executor service of current channel.
- *
- * @param channelId channel id
- * @return executor service of current channel
- */
- public ExecutorService get(final ChannelId channelId) {
- return executorServices.get(channelId);
- }
-
- /**
- * Unregister channel.
- *
- * @param channelId channel id
- */
- public void unregister(final ChannelId channelId) {
- executorServices.remove(channelId).shutdown();
- }
-}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
index bb3d81b..c199a8f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelector.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.proxy.frontend.executor;
-import io.netty.channel.ChannelId;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -36,15 +35,11 @@ public final class CommandExecutorSelector {
* @param isOccupyThreadForPerConnection is occupy thread for per
connection or not
* @param supportHint is support hint
* @param transactionType transaction type
- * @param channelId channel ID
+ * @param connectionId connection ID
* @return executor service
*/
- public static ExecutorService getExecutorService(final boolean
isOccupyThreadForPerConnection, final boolean supportHint, final
TransactionType transactionType, final ChannelId channelId) {
- return isOccupyThreadForPerConnection(isOccupyThreadForPerConnection,
supportHint, transactionType)
- ? ChannelThreadExecutorGroup.getInstance().get(channelId) :
UserExecutorGroup.getInstance().getExecutorService();
- }
-
- private static boolean isOccupyThreadForPerConnection(final boolean
isOccupyThreadForPerConnection, final boolean supportHint, final
TransactionType transactionType) {
- return isOccupyThreadForPerConnection || supportHint ||
TransactionType.isDistributedTransaction(transactionType);
+ public static ExecutorService getExecutorService(final boolean
isOccupyThreadForPerConnection, final boolean supportHint, final
TransactionType transactionType, final int connectionId) {
+ return isOccupyThreadForPerConnection || supportHint ||
TransactionType.isDistributedTransaction(transactionType)
+ ?
ConnectionThreadExecutorGroup.getInstance().get(connectionId) :
UserExecutorGroup.getInstance().getExecutorService();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
new file mode 100644
index 0000000..f1e325b
--- /dev/null
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.executor;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Connection thread executor group.
+ *
+ * <p>
+ * Manage the thread for each backend connection invoking.
+ * This ensure XA transaction framework processed by current thread id.
+ * </p>
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ConnectionThreadExecutorGroup {
+
+ private static final ConnectionThreadExecutorGroup INSTANCE = new
ConnectionThreadExecutorGroup();
+
+ private final Map<Integer, ExecutorService> executorServices = new
ConcurrentHashMap<>();
+
+ /**
+ * Get connection thread executor group.
+ *
+ * @return connection thread executor group
+ */
+ public static ConnectionThreadExecutorGroup getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Register connection.
+ *
+ * @param connectionId connection id
+ */
+ public void register(final int connectionId) {
+ executorServices.put(connectionId,
Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * Get executor service of connection.
+ *
+ * @param connectionId connection id
+ * @return executor service of current connection
+ */
+ public ExecutorService get(final int connectionId) {
+ return executorServices.get(connectionId);
+ }
+
+ /**
+ * Unregister connection and await termination.
+ *
+ * @param connectionId connection id
+ */
+ public void unregisterAndAwaitTermination(final int connectionId) {
+ ExecutorService executorService =
executorServices.remove(connectionId);
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(Long.MAX_VALUE,
TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 24e7201..1082eaf 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKe
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResult;
-import
org.apache.shardingsphere.proxy.frontend.executor.ChannelThreadExecutorGroup;
+import
org.apache.shardingsphere.proxy.frontend.executor.ConnectionThreadExecutorGroup;
import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
import
org.apache.shardingsphere.readwritesplitting.route.engine.impl.PrimaryVisitedManager;
@@ -53,8 +53,9 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
@Override
public void channelActive(final ChannelHandlerContext context) {
-
ChannelThreadExecutorGroup.getInstance().register(context.channel().id());
-
backendConnection.setConnectionId(databaseProtocolFrontendEngine.getAuthenticationEngine().handshake(context));
+ int connectionId =
databaseProtocolFrontendEngine.getAuthenticationEngine().handshake(context);
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+ backendConnection.setConnectionId(connectionId);
}
@Override
@@ -87,17 +88,17 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
@Override
public void channelInactive(final ChannelHandlerContext context) {
context.fireChannelInactive();
- closeAllResources(context);
+ closeAllResources();
}
- private void closeAllResources(final ChannelHandlerContext context) {
- databaseProtocolFrontendEngine.release(backendConnection);
+ private void closeAllResources() {
PrimaryVisitedManager.clear();
backendConnection.closeResultSets();
backendConnection.closeStatements();
backendConnection.closeConnections(true);
backendConnection.closeCalciteExecutor();
-
ChannelThreadExecutorGroup.getInstance().unregister(context.channel().id());
+
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
+ databaseProtocolFrontendEngine.release(backendConnection);
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
index 2cad095..925c6d0 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
/**
* OK proxy state.
@@ -39,12 +38,7 @@ public final class OKProxyState implements ProxyState {
boolean supportHint =
ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
boolean isOccupyThreadForPerConnection =
databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
ExecutorService executorService =
CommandExecutorSelector.getExecutorService(
- isOccupyThreadForPerConnection, supportHint,
backendConnection.getTransactionStatus().getTransactionType(),
context.channel().id());
- backendConnection.getSubmittedTaskCount().incrementAndGet();
- try {
- executorService.execute(new
CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context,
message));
- } catch (final RejectedExecutionException ignored) {
- backendConnection.getSubmittedTaskCount().decrementAndGet();
- }
+ isOccupyThreadForPerConnection, supportHint,
backendConnection.getTransactionStatus().getTransactionType(),
backendConnection.getConnectionId());
+ executorService.execute(new
CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context,
message));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
index 1ca3693..a68a328 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/CommandExecutorSelectorTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.proxy.frontend.executor;
-import io.netty.channel.ChannelId;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.junit.Test;
@@ -25,34 +24,33 @@ import java.util.concurrent.ExecutorService;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
public final class CommandExecutorSelectorTest {
@Test
public void assertGetExecutorServiceWithLocal() {
- ChannelId channelId = mock(ChannelId.class);
- assertThat(CommandExecutorSelector.getExecutorService(false, false,
TransactionType.LOCAL, channelId), instanceOf(ExecutorService.class));
+ int connectionId = 1;
+ assertThat(CommandExecutorSelector.getExecutorService(false, false,
TransactionType.LOCAL, connectionId), instanceOf(ExecutorService.class));
}
@Test
public void assertGetExecutorServiceWithOccupyThreadForPerConnection() {
- ChannelId channelId = mock(ChannelId.class);
- ChannelThreadExecutorGroup.getInstance().register(channelId);
- assertThat(CommandExecutorSelector.getExecutorService(true, false,
TransactionType.LOCAL, channelId), instanceOf(ExecutorService.class));
+ int connectionId = 2;
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+ assertThat(CommandExecutorSelector.getExecutorService(true, false,
TransactionType.LOCAL, connectionId), instanceOf(ExecutorService.class));
}
@Test
public void assertGetExecutorServiceWithXA() {
- ChannelId channelId = mock(ChannelId.class);
- ChannelThreadExecutorGroup.getInstance().register(channelId);
- assertThat(CommandExecutorSelector.getExecutorService(false, false,
TransactionType.XA, channelId), instanceOf(ExecutorService.class));
+ int connectionId = 3;
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+ assertThat(CommandExecutorSelector.getExecutorService(false, false,
TransactionType.XA, connectionId), instanceOf(ExecutorService.class));
}
@Test
public void assertGetExecutorServiceWithBASE() {
- ChannelId channelId = mock(ChannelId.class);
- ChannelThreadExecutorGroup.getInstance().register(channelId);
- assertThat(CommandExecutorSelector.getExecutorService(false, false,
TransactionType.BASE, channelId), instanceOf(ExecutorService.class));
+ int connectionId = 4;
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+ assertThat(CommandExecutorSelector.getExecutorService(false, false,
TransactionType.BASE, connectionId), instanceOf(ExecutorService.class));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroupTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroupTest.java
similarity index 61%
rename from
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroupTest.java
rename to
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroupTest.java
index 6de8ce13..2cf9940 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ChannelThreadExecutorGroupTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroupTest.java
@@ -17,28 +17,26 @@
package org.apache.shardingsphere.proxy.frontend.executor;
-import io.netty.channel.ChannelId;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-public final class ChannelThreadExecutorGroupTest {
+public final class ConnectionThreadExecutorGroupTest {
@Test
public void assertRegister() {
- ChannelId channelId = mock(ChannelId.class);
- ChannelThreadExecutorGroup.getInstance().register(channelId);
- assertNotNull(ChannelThreadExecutorGroup.getInstance().get(channelId));
- ChannelThreadExecutorGroup.getInstance().unregister(channelId);
+ int connectionId = 1;
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+
assertNotNull(ConnectionThreadExecutorGroup.getInstance().get(connectionId));
+
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionId);
}
@Test
public void assertUnregister() {
- ChannelId channelId = mock(ChannelId.class);
- ChannelThreadExecutorGroup.getInstance().register(channelId);
- ChannelThreadExecutorGroup.getInstance().unregister(channelId);
- assertNull(ChannelThreadExecutorGroup.getInstance().get(channelId));
+ int connectionId = 2;
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionId);
+
assertNull(ConnectionThreadExecutorGroup.getInstance().get(connectionId));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
index 2f64869..46a9f81 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java
@@ -46,20 +46,9 @@ public final class PostgreSQLFrontendEngine implements
DatabaseProtocolFrontendE
@Override
public void release(final BackendConnection backendConnection) {
- waitingForFinish(backendConnection);
PostgreSQLBinaryStatementRegistry.getInstance().unregister(backendConnection.getConnectionId());
}
- private void waitingForFinish(final BackendConnection backendConnection) {
- while (backendConnection.getSubmittedTaskCount().get() > 0) {
- try {
- Thread.sleep(500L);
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
@Override
public String getDatabaseType() {
return "PostgreSQL";
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
index 4a938c9..d1f3da0 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngineTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.frontend.postgresql;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import
org.apache.shardingsphere.proxy.frontend.executor.ConnectionThreadExecutorGroup;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -47,6 +48,8 @@ public final class PostgreSQLFrontendEngineTest {
registry.register(connectionId);
assertNotNull(registry.get(connectionId));
PostgreSQLFrontendEngine frontendEngine = new
PostgreSQLFrontendEngine();
+ ConnectionThreadExecutorGroup.getInstance().register(connectionId);
+
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionId);
frontendEngine.release(backendConnection);
assertNull(registry.get(connectionId));
}