This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 195f72dac20 Refactor show processlist and kill logic (#30044)
195f72dac20 is described below
commit 195f72dac209e4d4fb4b5305519ffc953377d8f6
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Wed Feb 7 16:21:39 2024 +0800
Refactor show processlist and kill logic (#30044)
* Refactor show processlist and kill logic
* Refactor show processlist and kill logic
* modify interrupted to final field
* refactor YamlProcess
* refactor YamlProcess
* fix unit test
* fix unit test
* fix checkstyle
* fix checkstyle
---
.../infra/executor/sql/process/Process.java | 31 +++++++++++++++++-----
.../executor/sql/process/yaml/YamlProcess.java | 2 ++
.../process/yaml/swapper/YamlProcessSwapper.java | 6 +++--
.../NewProcessListChangedSubscriber.java | 2 +-
.../subscriber/ProcessListChangedSubscriber.java | 4 +--
.../ProcessListChangedSubscriberTest.java | 4 ++-
.../executor/ShowProcessListExecutorTest.java | 3 ++-
7 files changed, 38 insertions(+), 14 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index 8597a563fc4..dce76230e60 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -21,13 +21,17 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import java.sql.Statement;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -37,6 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger;
@Getter
public final class Process {
+ private final Map<ExecutionUnit, Statement> processStatements = new
ConcurrentHashMap<>();
+
private final String id;
private final long startMillis;
@@ -51,14 +57,14 @@ public final class Process {
private final int totalUnitCount;
- private final Collection<Statement> processStatements;
-
private final AtomicInteger completedUnitCount;
private final boolean idle;
private final boolean heldByConnection;
+ private final AtomicBoolean interrupted;
+
public Process(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext, final boolean heldByConnection) {
this("", executionGroupContext, true, heldByConnection);
}
@@ -76,10 +82,11 @@ public final class Process {
username = null == grantee ? null : grantee.getUsername();
hostname = null == grantee ? null : grantee.getHostname();
totalUnitCount = getTotalUnitCount(executionGroupContext);
- processStatements = getProcessStatements(executionGroupContext);
+
processStatements.putAll(createProcessStatements(executionGroupContext));
completedUnitCount = new AtomicInteger(0);
this.idle = idle;
this.heldByConnection = heldByConnection;
+ interrupted = new AtomicBoolean();
}
private int getTotalUnitCount(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext) {
@@ -90,12 +97,13 @@ public final class Process {
return result;
}
- private Collection<Statement> getProcessStatements(final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
- Collection<Statement> result = new LinkedList<>();
+ private Map<ExecutionUnit, Statement> createProcessStatements(final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
+ Map<ExecutionUnit, Statement> result = new LinkedHashMap<>();
for (ExecutionGroup<? extends SQLExecutionUnit> each :
executionGroupContext.getInputGroups()) {
for (SQLExecutionUnit executionUnit : each.getInputs()) {
if (executionUnit instanceof JDBCExecutionUnit) {
- result.add(((JDBCExecutionUnit)
executionUnit).getStorageResource());
+ JDBCExecutionUnit jdbcExecutionUnit = (JDBCExecutionUnit)
executionUnit;
+ result.put(jdbcExecutionUnit.getExecutionUnit(),
jdbcExecutionUnit.getStorageResource());
}
}
}
@@ -117,4 +125,13 @@ public final class Process {
public int getCompletedUnitCount() {
return completedUnitCount.get();
}
+
+ /**
+ * Get process statements.
+ *
+ * @return process statements
+ */
+ public Collection<Statement> getProcessStatements() {
+ return processStatements.values();
+ }
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index 47dba29e7c6..2c33485f12a 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -47,4 +47,6 @@ public final class YamlProcess implements YamlConfiguration {
private boolean idle;
private boolean heldByConnection;
+
+ private boolean interrupted;
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index e32f9e63b4e..d733859f855 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
-import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -42,12 +42,14 @@ public final class YamlProcessSwapper implements
YamlConfigurationSwapper<YamlPr
result.setCompletedUnitCount(data.getCompletedUnitCount());
result.setIdle(data.isIdle());
result.setHeldByConnection(data.isHeldByConnection());
+ result.setInterrupted(data.getInterrupted().get());
return result;
}
@Override
public Process swapToObject(final YamlProcess yamlConfig) {
return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(),
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(),
yamlConfig.getHostname(),
- yamlConfig.getTotalUnitCount(), Collections.emptyList(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(),
yamlConfig.isHeldByConnection());
+ yamlConfig.getTotalUnitCount(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(),
yamlConfig.isHeldByConnection(),
+ new AtomicBoolean(yamlConfig.isInterrupted()));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
index f6aba3dd3d0..f927464bf73 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
@@ -40,7 +40,6 @@ import java.util.Collection;
* TODO replace the old ProcessListChangedSubscriber after meta data refactor
completed
* New process list changed subscriber.
*/
-@SuppressWarnings("UnstableApiUsage")
public final class NewProcessListChangedSubscriber {
private final NewRegistryCenter registryCenter;
@@ -96,6 +95,7 @@ public final class NewProcessListChangedSubscriber {
}
Process process =
ProcessRegistry.getInstance().get(event.getProcessId());
if (null != process) {
+ process.getInterrupted().set(true);
for (Statement each : process.getProcessStatements()) {
each.cancel();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 42bf40f9ad7..18ac44232ec 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -27,8 +27,8 @@ import
org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
@@ -39,7 +39,6 @@ import java.util.Collection;
/**
* Process list changed subscriber.
*/
-@SuppressWarnings("UnstableApiUsage")
public final class ProcessListChangedSubscriber {
private final RegistryCenter registryCenter;
@@ -95,6 +94,7 @@ public final class ProcessListChangedSubscriber {
}
Process process =
ProcessRegistry.getInstance().get(event.getProcessId());
if (null != process) {
+ process.getInterrupted().set(true);
for (Statement each : process.getProcessStatements()) {
each.cancel();
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index e7adb0497c5..7dd15982013 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -108,11 +108,12 @@ class ProcessListChangedSubscriberTest {
@Test
void assertReportLocalProcesses() {
- String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
Process process = mock(Process.class);
String processId = "foo_id";
when(process.getId()).thenReturn(processId);
+ when(process.getInterrupted()).thenReturn(new AtomicBoolean());
ProcessRegistry.getInstance().add(process);
+ String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent(instanceId, processId));
ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
@@ -120,6 +121,7 @@ class ProcessListChangedSubscriberTest {
+ " heldByConnection: false" + System.lineSeparator()
+ " id: foo_id" + System.lineSeparator()
+ " idle: false" + System.lineSeparator()
+ + " interrupted: false" + System.lineSeparator()
+ " startMillis: 0" + System.lineSeparator()
+ " totalUnitCount: 0" + System.lineSeparator());
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" +
instanceId + ":foo_id");
diff --git
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 158777ad094..04d8c60069c 100644
---
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -33,6 +33,7 @@ import org.mockito.internal.configuration.plugins.Plugins;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
@@ -66,7 +67,7 @@ class ShowProcessListExecutorTest {
private void setupProcesses(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80",
1617939785160L,
- "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new
AtomicInteger(1), false, false);
+ "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, new AtomicInteger(1), false, false, new
AtomicBoolean());
Plugins.getMemberAccessor().set(
showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(process));
}