This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9ea6e12 Fixes #9341 (#9369)
9ea6e12 is described below
commit 9ea6e1257b031dd1ae1dc25d02f4b9377482a61b
Author: Haoran Meng <[email protected]>
AuthorDate: Sun Feb 7 16:22:40 2021 +0800
Fixes #9341 (#9369)
---
.../proxy/frontend/state/impl/LockProxyState.java | 21 ++-------------------
1 file changed, 2 insertions(+), 19 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
index 766dd04..f8eac7d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java
@@ -21,19 +21,15 @@ import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.state.StateType;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.BackendException;
-import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import
org.apache.shardingsphere.proxy.backend.exception.LockWaitTimeoutException;
-import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
-import
org.apache.shardingsphere.proxy.frontend.executor.CommandExecutorSelector;
import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
+import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -45,11 +41,7 @@ public final class LockProxyState implements ProxyState {
@Override
public void execute(final ChannelHandlerContext context, final Object
message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine,
final BackendConnection backendConnection) {
block(context, databaseProtocolFrontendEngine);
- if (ProxyContext.getInstance().getStateContext().getCurrentState() ==
StateType.OK) {
- doExecute(context, message, databaseProtocolFrontendEngine,
backendConnection);
- } else if
(ProxyContext.getInstance().getStateContext().getCurrentState() ==
StateType.CIRCUIT_BREAK) {
- doError(context, databaseProtocolFrontendEngine, new
CircuitBreakException());
- }
+ ProxyStateContext.execute(context, message,
databaseProtocolFrontendEngine, backendConnection);
}
private void block(final ChannelHandlerContext context, final
DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine) {
@@ -59,15 +51,6 @@ public final class LockProxyState implements ProxyState {
}
}
- private void doExecute(final ChannelHandlerContext context, final Object
message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine,
final BackendConnection backendConnection) {
- boolean supportHint =
ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
- boolean isOccupyThreadForPerConnection =
databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
- ExecutorService executorService =
CommandExecutorSelector.getExecutorService(
- isOccupyThreadForPerConnection, supportHint,
backendConnection.getTransactionStatus().getTransactionType(),
context.channel().id());
- Runnable commandExecutorTask = new
CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context,
message);
- executorService.execute(commandExecutorTask);
- }
-
private void doError(final ChannelHandlerContext context, final
DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final
BackendException backendException) {
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(backendException));
Optional<DatabasePacket<?>> databasePacket =
databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();