This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 442e2e50ce4 show processlist can check sleep connection (#21000)
442e2e50ce4 is described below
commit 442e2e50ce47639b90297488b9df888c916a5728
Author: natehuang <[email protected]>
AuthorDate: Thu Sep 22 17:48:36 2022 +0800
show processlist can check sleep connection (#21000)
* show processlist can check sleep connection
* show process distinguishes between proxy and jdbc
* move uuid to processEngine
---
.../kernel/model/ExecutionGroupContext.java | 4 +-
.../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +-
.../sql/execute/engine/raw/RawExecutor.java | 6 +--
.../raw/callback/RawSQLExecutorCallback.java | 2 +-
.../executor/sql/process/ExecuteProcessEngine.java | 52 ++++++++++++++++++----
.../sql/process/model/ExecuteProcessConstants.java | 2 +-
.../sql/process/model/ExecuteProcessContext.java | 23 ++++++++--
.../model/yaml/YamlExecuteProcessContext.java | 4 ++
.../sql/process/spi/ExecuteProcessReporter.java | 13 ++++++
.../sql/process/ExecuteProcessEngineTest.java | 8 ++--
.../fixture/ExecuteProcessReporterFixture.java | 8 ++++
.../driver/executor/DriverJDBCExecutor.java | 18 ++++----
.../executor/FilterableTableScanExecutor.java | 6 +--
.../executor/TranslatableTableScanExecutor.java | 6 +--
.../process/GovernanceExecuteProcessReporter.java | 25 ++++++++++-
.../GovernanceExecuteProcessReporterTest.java | 4 +-
.../backend/communication/ProxySQLExecutor.java | 2 +
.../communication/ReactiveProxySQLExecutor.java | 1 +
.../jdbc/executor/ProxyJDBCExecutor.java | 6 +--
.../vertx/executor/ProxyReactiveExecutor.java | 6 +--
.../mysql/executor/ShowProcessListExecutor.java | 17 ++++---
.../proxy/backend/session/ConnectionSession.java | 2 +
.../netty/FrontendChannelInboundHandler.java | 4 ++
23 files changed, 168 insertions(+), 53 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
index 25b1dac7cef..da7e93bcd4c 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupContext.java
@@ -23,8 +23,6 @@ import lombok.Setter;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
/**
* Execution group context.
@@ -38,7 +36,7 @@ public final class ExecutionGroupContext<T> {
private final Collection<ExecutionGroup<T>> inputGroups;
- private final String executionID = new
UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ private volatile String executionID;
private volatile String databaseName;
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index c33de217f23..b030381c864 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -121,7 +121,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
private void finishReport(final Map<String, Object> dataMap, final
SQLExecutionUnit executionUnit) {
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
-
ExecuteProcessEngine.finish(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit, eventBusContext);
+
ExecuteProcessEngine.finishExecution(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit, eventBusContext);
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index 9384599a0f3..7ae30fcbbc2 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -60,14 +60,14 @@ public final class RawExecutor {
public List<ExecuteResult> execute(final
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext, final
QueryContext queryContext,
final RawSQLExecutorCallback callback)
throws SQLException {
try {
- ExecuteProcessEngine.initialize(queryContext,
executionGroupContext, eventBusContext);
+ ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
// TODO Load query header for first query
List<ExecuteResult> results = execute(executionGroupContext,
(RawSQLExecutorCallback) null, callback);
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return results.isEmpty() || Objects.isNull(results.get(0)) ?
Collections
.singletonList(new UpdateResult(0, 0L)) : results;
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index c1b223fdd4b..26b050e6ae5 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -52,7 +52,7 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
String executionID =
dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString();
for (RawSQLExecutionUnit each : inputs) {
- ExecuteProcessEngine.finish(executionID, each,
eventBusContext);
+ ExecuteProcessEngine.finishExecution(executionID, each,
eventBusContext);
}
}
return result;
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
index 0211478c4b0..b8934355d3e 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.executor.sql.process;
+import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.QueryContext;
@@ -26,13 +27,17 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionU
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporterFactory;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
+import java.util.Collections;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/**
* Execute process engine.
@@ -41,14 +46,45 @@ import java.util.Optional;
public final class ExecuteProcessEngine {
/**
- * Initialize.
+ * Initialize connection.
+ *
+ * @param grantee grantee
+ * @param databaseName database name
+ * @param eventBusContext event bus context
+ * @return execution id
+ */
+ public static String initializeConnection(final Grantee grantee, final
String databaseName, final EventBusContext eventBusContext) {
+ ExecutionGroupContext<SQLExecutionUnit> executionGroupContext = new
ExecutionGroupContext<>(Collections.emptyList());
+ executionGroupContext.setExecutionID(new
UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
+ executionGroupContext.setGrantee(grantee);
+ executionGroupContext.setDatabaseName(databaseName);
+ Optional<ExecuteProcessReporter> reporter =
ExecuteProcessReporterFactory.getInstance();
+ reporter.ifPresent(executeProcessReporter ->
executeProcessReporter.report(executionGroupContext));
+ return executionGroupContext.getExecutionID();
+ }
+
+ /**
+ * Finish connection.
+ *
+ * @param executionID execution id
+ */
+ public static void finishConnection(final String executionID) {
+ Optional<ExecuteProcessReporter> reporter =
ExecuteProcessReporterFactory.getInstance();
+ reporter.ifPresent(executeProcessReporter ->
executeProcessReporter.reportRemove(executionID));
+ }
+
+ /**
+ * Initialize execution.
*
* @param queryContext query context
* @param executionGroupContext execution group context
* @param eventBusContext event bus context
*/
- public static void initialize(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
EventBusContext eventBusContext) {
+ public static void initializeExecution(final QueryContext queryContext,
final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
final EventBusContext eventBusContext) {
Optional<ExecuteProcessReporter> reporter =
ExecuteProcessReporterFactory.getInstance();
+ if (Strings.isNullOrEmpty(executionGroupContext.getExecutionID())) {
+ executionGroupContext.setExecutionID(new
UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
+ }
if (reporter.isPresent() &&
isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
ExecutorDataMap.getValue().put(ExecuteProcessConstants.EXECUTE_ID.name(),
executionGroupContext.getExecutionID());
reporter.get().report(queryContext, executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_START, eventBusContext);
@@ -56,13 +92,13 @@ public final class ExecuteProcessEngine {
}
/**
- * Finish.
+ * Finish execution.
*
* @param executionID execution ID
* @param executionUnit execution unit
* @param eventBusContext event bus context
*/
- public static void finish(final String executionID, final SQLExecutionUnit
executionUnit, final EventBusContext eventBusContext) {
+ public static void finishExecution(final String executionID, final
SQLExecutionUnit executionUnit, final EventBusContext eventBusContext) {
Optional<ExecuteProcessReporter> reporter =
ExecuteProcessReporterFactory.getInstance();
if (reporter.isPresent() &&
ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
reporter.get().report(executionID, executionUnit,
ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext);
@@ -70,12 +106,12 @@ public final class ExecuteProcessEngine {
}
/**
- * Finish.
+ * Finish execution.
*
* @param executionID execution ID
* @param eventBusContext event bus context
*/
- public static void finish(final String executionID, final EventBusContext
eventBusContext) {
+ public static void finishExecution(final String executionID, final
EventBusContext eventBusContext) {
Optional<ExecuteProcessReporter> reporter =
ExecuteProcessReporterFactory.getInstance();
if (reporter.isPresent() &&
ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
reporter.get().report(executionID,
ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext);
@@ -83,9 +119,9 @@ public final class ExecuteProcessEngine {
}
/**
- * Clean.
+ * Clean execution.
*/
- public static void clean() {
+ public static void cleanExecution() {
Optional<ExecuteProcessReporter> reporter =
ExecuteProcessReporterFactory.getInstance();
if (reporter.isPresent() &&
ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
reporter.get().reportClean(ExecutorDataMap.getValue().get(ExecuteProcessConstants.EXECUTE_ID.name()).toString());
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
index e551380a8de..67117eec151 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
@@ -22,5 +22,5 @@ package
org.apache.shardingsphere.infra.executor.sql.process.model;
*/
public enum ExecuteProcessConstants {
- EXECUTE_ID, EXECUTE_STATUS_START, EXECUTE_STATUS_DONE
+ EXECUTE_ID, EXECUTE_STATUS_START, EXECUTE_STATUS_DONE, EXECUTE_STATUS_SLEEP
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
index 7f78216cd8e..f951d137508 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessContext.java
@@ -44,22 +44,29 @@ public final class ExecuteProcessContext {
private final String hostname;
- private final String sql;
+ private String sql;
private final Map<String, ExecuteProcessUnit> processUnits = new
HashMap<>();
private final Collection<Statement> processStatements = new LinkedList<>();
- private final long startTimeMillis = System.currentTimeMillis();
+ private long startTimeMillis = System.currentTimeMillis();
- public ExecuteProcessContext(final String sql, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
ExecuteProcessConstants constants) {
+ private ExecuteProcessConstants executeProcessConstants;
+
+ private final boolean proxyContext;
+
+ public ExecuteProcessContext(final String sql, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
ExecuteProcessConstants constants,
+ final boolean isProxyContext) {
this.executionID = executionGroupContext.getExecutionID();
this.sql = sql;
this.databaseName = executionGroupContext.getDatabaseName();
Grantee grantee = executionGroupContext.getGrantee();
this.username = null != grantee ? grantee.getUsername() : null;
this.hostname = null != grantee ? grantee.getHostname() : null;
+ executeProcessConstants = constants;
addProcessUnitsAndStatements(executionGroupContext, constants);
+ proxyContext = isProxyContext;
}
private void addProcessUnitsAndStatements(final ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessConstants
constants) {
@@ -73,4 +80,14 @@ public final class ExecuteProcessContext {
}
}
}
+
+ /**
+ * Reset execute process context to sleep.
+ */
+ public void resetExecuteProcessContextToSleep() {
+ this.sql = "";
+ this.startTimeMillis = System.currentTimeMillis();
+ this.executeProcessConstants =
ExecuteProcessConstants.EXECUTE_STATUS_SLEEP;
+ }
+
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
index f15807026c1..c18de1c7ef9 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
@@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
@@ -50,6 +51,8 @@ public final class YamlExecuteProcessContext {
private Long startTimeMillis;
+ private ExecuteProcessConstants executeProcessConstants;
+
public YamlExecuteProcessContext(final ExecuteProcessContext
executeProcessContext) {
executionID = executeProcessContext.getExecutionID();
databaseName = executeProcessContext.getDatabaseName();
@@ -61,5 +64,6 @@ public final class YamlExecuteProcessContext {
unitStatuses.add(new YamlExecuteProcessUnit(each));
}
startTimeMillis = executeProcessContext.getStartTimeMillis();
+ executeProcessConstants =
executeProcessContext.getExecuteProcessConstants();
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
index 8d43351824e..7b8c949b1e0 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
@@ -31,6 +31,12 @@ import
org.apache.shardingsphere.infra.util.spi.type.optional.OptionalSPI;
@SingletonSPI
public interface ExecuteProcessReporter extends OptionalSPI {
+ /**
+ * Report this connection for proxy.
+ * @param executionGroupContext execution group context
+ */
+ void report(ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext);
+
/**
* Report the summary of this task.
* @param queryContext query context
@@ -63,4 +69,11 @@ public interface ExecuteProcessReporter extends OptionalSPI {
* @param executionID execution ID
*/
void reportClean(String executionID);
+
+ /**
+ * Report remove process context.
+ *
+ * @param executionID execution ID
+ */
+ void reportRemove(String executionID);
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
index 54474f98a2f..104d8655afc 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
@@ -46,22 +46,22 @@ public final class ExecuteProcessEngineTest {
@Before
public void setUp() {
executionGroupContext = createMockedExecutionGroups();
- ExecuteProcessEngine.initialize(createQueryContext(),
executionGroupContext, eventBusContext);
+ ExecuteProcessEngine.initializeExecution(createQueryContext(),
executionGroupContext, eventBusContext);
assertThat(ExecutorDataMap.getValue().get("EXECUTE_ID"),
is(executionGroupContext.getExecutionID()));
assertThat(ExecuteProcessReporterFixture.ACTIONS.get(0), is("Report
the summary of this task."));
}
@Test
public void assertFinish() {
- ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
mock(RawSQLExecutionUnit.class), eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
mock(RawSQLExecutionUnit.class), eventBusContext);
assertThat(ExecuteProcessReporterFixture.ACTIONS.get(1), is("Report a
unit of this task."));
- ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
assertThat(ExecuteProcessReporterFixture.ACTIONS.get(2), is("Report
this task on completion."));
}
@Test
public void assertClean() {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
assertTrue(ExecutorDataMap.getValue().isEmpty());
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
index 75a926e1ee2..95155043b0b 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
@@ -30,6 +30,10 @@ public final class ExecuteProcessReporterFixture implements
ExecuteProcessReport
public static final LinkedList<String> ACTIONS = new LinkedList<>();
+ @Override
+ public void report(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
+ }
+
@Override
public void report(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
ExecuteProcessConstants constants,
final EventBusContext eventBusContext) {
@@ -49,4 +53,8 @@ public final class ExecuteProcessReporterFixture implements
ExecuteProcessReport
@Override
public void reportClean(final String executionID) {
}
+
+ @Override
+ public void reportRemove(final String executionID) {
+ }
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index fa11f1019d6..bd595960d98 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -78,12 +78,12 @@ public final class DriverJDBCExecutor {
public List<QueryResult> executeQuery(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final QueryContext queryContext,
final ExecuteQueryCallback callback) throws SQLException {
try {
- ExecuteProcessEngine.initialize(queryContext,
executionGroupContext, eventBusContext);
+ ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
List<QueryResult> result =
jdbcExecutor.execute(executionGroupContext, callback);
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
@@ -100,14 +100,14 @@ public final class DriverJDBCExecutor {
public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext,
final QueryContext queryContext, final
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback)
throws SQLException {
try {
- ExecuteProcessEngine.initialize(queryContext,
executionGroupContext, eventBusContext);
+ ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
SQLStatementContext<?> sqlStatementContext =
queryContext.getSqlStatementContext();
List<Integer> results = doExecute(executionGroupContext,
sqlStatementContext, routeUnits, callback);
int result =
isNeedAccumulate(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
sqlStatementContext) ? accumulate(results) : results.get(0);
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
@@ -141,13 +141,13 @@ public final class DriverJDBCExecutor {
public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext, final QueryContext queryContext,
final Collection<RouteUnit> routeUnits, final
JDBCExecutorCallback<Boolean> callback) throws SQLException {
try {
- ExecuteProcessEngine.initialize(queryContext,
executionGroupContext, eventBusContext);
+ ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
List<Boolean> results = doExecute(executionGroupContext,
queryContext.getSqlStatementContext(), routeUnits, callback);
boolean result = null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
b/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 15f6aee51bc..40ee83e127e 100644
---
a/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -130,9 +130,9 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
setParameters(executionGroupContext.getInputGroups());
- ExecuteProcessEngine.initialize(context.getQueryContext(),
executionGroupContext, eventBusContext);
+
ExecuteProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
List<QueryResult> queryResults = execute(executionGroupContext,
databaseType);
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
// TODO need to get session context
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults,
queryContext.getSqlStatementContext());
@@ -141,7 +141,7 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
b/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index a819e5ad500..9846b54f916 100644
---
a/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-sql-federation/shardingsphere-sql-federation-executor/shardingsphere-sql-federation-executor-core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -135,9 +135,9 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
setParameters(executionGroupContext.getInputGroups());
- ExecuteProcessEngine.initialize(context.getQueryContext(),
executionGroupContext, eventBusContext);
+
ExecuteProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
List<QueryResult> queryResults = execute(executionGroupContext,
databaseType);
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults,
queryContext.getSqlStatementContext());
Collection<Statement> statements =
getStatements(executionGroupContext.getInputGroups());
@@ -145,7 +145,7 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
index f9f000384ca..d3107078659 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
@@ -33,10 +33,18 @@ import java.util.Optional;
*/
public final class GovernanceExecuteProcessReporter implements
ExecuteProcessReporter {
+ @Override
+ public void report(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
+ ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext("", executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_SLEEP, true);
+
ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(),
executeProcessContext);
+ }
+
@Override
public void report(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
final ExecuteProcessConstants constants, final
EventBusContext eventBusContext) {
- ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants);
+ ExecuteProcessContext originExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionGroupContext.getExecutionID());
+ boolean isProxyContext = null != originExecuteProcessContext &&
originExecuteProcessContext.isProxyContext();
+ ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants,
isProxyContext);
ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(),
executeProcessContext);
ShowProcessListManager.getInstance().putProcessStatement(executeProcessContext.getExecutionID(),
executeProcessContext.getProcessStatements());
}
@@ -54,7 +62,20 @@ public final class GovernanceExecuteProcessReporter
implements ExecuteProcessRep
@Override
public void reportClean(final String executionID) {
- ShowProcessListManager.getInstance().removeProcessContext(executionID);
ShowProcessListManager.getInstance().removeProcessStatement(executionID);
+
Optional.ofNullable(ShowProcessListManager.getInstance().getProcessContext(executionID)).ifPresent(
+ executeProcessContext -> {
+ if (executeProcessContext.isProxyContext()) {
+
executeProcessContext.resetExecuteProcessContextToSleep();
+ } else {
+
ShowProcessListManager.getInstance().removeProcessContext(executionID);
+ }
+ });
+ }
+
+ @Override
+ public void reportRemove(final String executionID) {
+
ShowProcessListManager.getInstance().removeProcessStatement(executionID);
+ ShowProcessListManager.getInstance().removeProcessContext(executionID);
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
index 4a0961f4f83..43360267220 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
@@ -83,8 +83,10 @@ public final class GovernanceExecuteProcessReporterTest {
@Test
public void assertReportClean() {
+ ExecuteProcessContext executeProcessContext =
mock(ExecuteProcessContext.class);
+
when(showProcessListManager.getProcessContext("foo_id")).thenReturn(executeProcessContext);
reporter.reportClean("foo_id");
- verify(showProcessListManager,
times(1)).removeProcessContext(eq("foo_id"));
+ verify(showProcessListManager,
times(1)).removeProcessStatement(eq("foo_id"));
}
@After
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 1a5d01c5e15..fb1bc24b6da 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -172,6 +172,7 @@ public final class ProxySQLExecutor {
}
executionGroupContext.setDatabaseName(backendConnection.getConnectionSession().getDatabaseName());
executionGroupContext.setGrantee(backendConnection.getConnectionSession().getGrantee());
+
executionGroupContext.setExecutionID(backendConnection.getConnectionSession().getExecutionId());
// TODO handle query header
return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new
RawSQLExecutorCallback(ProxyContext.getInstance().getContextManager().getInstanceContext()
.getEventBusContext()));
@@ -190,6 +191,7 @@ public final class ProxySQLExecutor {
}
executionGroupContext.setDatabaseName(backendConnection.getConnectionSession().getDatabaseName());
executionGroupContext.setGrantee(backendConnection.getConnectionSession().getGrantee());
+
executionGroupContext.setExecutionID(backendConnection.getConnectionSession().getExecutionId());
return jdbcExecutor.execute(executionContext.getQueryContext(),
executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
index 14f15c667a1..759609f4156 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
@@ -133,6 +133,7 @@ public final class ReactiveProxySQLExecutor {
}
executionGroupContext.setDatabaseName(backendConnection.getConnectionSession().getDatabaseName());
executionGroupContext.setGrantee(backendConnection.getConnectionSession().getGrantee());
+
executionGroupContext.setExecutionID(backendConnection.getConnectionSession().getExecutionId());
return reactiveExecutor.execute(executionContext.getQueryContext(),
executionGroupContext);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index e98d425ec64..bbb0b956953 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -69,17 +69,17 @@ public final class ProxyJDBCExecutor {
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
DatabaseType protocolType = database.getProtocolType();
DatabaseType databaseType =
database.getResource().getDatabaseType();
- ExecuteProcessEngine.initialize(queryContext,
executionGroupContext, eventBusContext);
+ ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
SQLStatementContext<?> context =
queryContext.getSqlStatementContext();
List<ExecuteResult> result =
jdbcExecutor.execute(executionGroupContext,
ProxyJDBCExecutorCallbackFactory.newInstance(type,
protocolType, databaseType, context.getSqlStatement(),
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
true),
ProxyJDBCExecutorCallbackFactory.newInstance(type,
protocolType, databaseType, context.getSqlStatement(),
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
false));
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
}
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/executor/ProxyReactiveExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/executor/ProxyReactiveExecutor.java
index 26d9695ede3..82dd3cdf4f4 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/executor/ProxyReactiveExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/executor/ProxyReactiveExecutor.java
@@ -50,13 +50,13 @@ public final class ProxyReactiveExecutor {
*/
public Future<List<ExecuteResult>> execute(final QueryContext
queryContext, final ExecutionGroupContext<VertxExecutionUnit>
executionGroupContext) {
EventBusContext eventBusContext =
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
- ExecuteProcessEngine.initialize(queryContext, executionGroupContext,
eventBusContext);
+ ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
List<Future<ExecuteResult>> futures =
vertxExecutor.execute(executionGroupContext, new VertxExecutorCallback());
return CompositeFuture.all(new
ArrayList<>(futures)).compose(compositeFuture -> {
-
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(),
eventBusContext);
+
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return
Future.succeededFuture(compositeFuture.<ExecuteResult>list());
}).eventually(unused -> {
- ExecuteProcessEngine.clean();
+ ExecuteProcessEngine.cleanExecution();
return Future.succeededFuture();
});
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/admin/mysql/executor/ShowProcessListExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/admin/mysql/executor/ShowProcessListExecutor.java
index fc0a4226665..cd9b70f5595 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/admin/mysql/executor/ShowProcessListExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/admin/mysql/executor/ShowProcessListExecutor.java
@@ -95,12 +95,19 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
rowValues.add(processContext.getUsername());
rowValues.add(processContext.getHostname());
rowValues.add(processContext.getDatabaseName());
- rowValues.add("Execute");
+ rowValues.add(processContext.getExecuteProcessConstants() ==
ExecuteProcessConstants.EXECUTE_STATUS_SLEEP ? "Sleep" : "Execute");
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
processContext.getStartTimeMillis()));
- int processDoneCount =
processContext.getUnitStatuses().stream().map(each ->
ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 :
0).reduce(0, Integer::sum);
- String statePrefix = "Executing ";
- rowValues.add(statePrefix + processDoneCount + "/" +
processContext.getUnitStatuses().size());
- String sql = processContext.getSql();
+ String sql = null;
+ if (processContext.getExecuteProcessConstants() !=
ExecuteProcessConstants.EXECUTE_STATUS_SLEEP) {
+ int processDoneCount =
processContext.getUnitStatuses().stream()
+ .map(each ->
ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 : 0)
+ .reduce(0, Integer::sum);
+ String statePrefix = "Executing ";
+ rowValues.add(statePrefix + processDoneCount + "/" +
processContext.getUnitStatuses().size());
+ sql = processContext.getSql();
+ } else {
+ rowValues.add("");
+ }
if (null != sql && sql.length() > 100) {
sql = sql.substring(0, 100);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index db128f69da9..1d7e261fdc1 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -76,6 +76,8 @@ public final class ConnectionSession {
private final RequiredSessionVariableRecorder
requiredSessionVariableRecorder = new RequiredSessionVariableRecorder();
+ private volatile String executionId;
+
private QueryContext queryContext;
public ConnectionSession(final DatabaseType databaseType, final
TransactionType initialTransactionType, final AttributeMap attributeMap) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index ee015e74e8c..b359dad73eb 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.CommonConstants;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -77,6 +78,8 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
if (authResult.isFinished()) {
connectionSession.setGrantee(new
Grantee(authResult.getUsername(), authResult.getHostname()));
connectionSession.setCurrentDatabase(authResult.getDatabase());
+
connectionSession.setExecutionId(ExecuteProcessEngine.initializeConnection(connectionSession.getGrantee(),
connectionSession.getDatabaseName(),
+
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext()));
}
return authResult.isFinished();
// CHECKSTYLE:OFF
@@ -102,6 +105,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
} catch (final BackendConnectionException ex) {
log.error("Exception occurred when frontend connection [{}]
disconnected", connectionSession.getConnectionId(), ex);
}
+
ExecuteProcessEngine.finishConnection(connectionSession.getExecutionId());
databaseProtocolFrontendEngine.release(connectionSession);
}