This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 195f72dac20 Refactor show processlist and kill logic (#30044)
195f72dac20 is described below

commit 195f72dac209e4d4fb4b5305519ffc953377d8f6
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Wed Feb 7 16:21:39 2024 +0800

    Refactor show processlist and kill logic (#30044)
    
    * Refactor show processlist and kill logic
    
    * Refactor show processlist and kill logic
    
    * modify interrupted to final field
    
    * refactor YamlProcess
    
    * refactor YamlProcess
    
    * fix unit test
    
    * fix unit test
    
    * fix checkstyle
    
    * fix checkstyle
---
 .../infra/executor/sql/process/Process.java        | 31 +++++++++++++++++-----
 .../executor/sql/process/yaml/YamlProcess.java     |  2 ++
 .../process/yaml/swapper/YamlProcessSwapper.java   |  6 +++--
 .../NewProcessListChangedSubscriber.java           |  2 +-
 .../subscriber/ProcessListChangedSubscriber.java   |  4 +--
 .../ProcessListChangedSubscriberTest.java          |  4 ++-
 .../executor/ShowProcessListExecutorTest.java      |  3 ++-
 7 files changed, 38 insertions(+), 14 deletions(-)

diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index 8597a563fc4..dce76230e60 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -21,13 +21,17 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 
 import java.sql.Statement;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -37,6 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Getter
 public final class Process {
     
+    private final Map<ExecutionUnit, Statement> processStatements = new 
ConcurrentHashMap<>();
+    
     private final String id;
     
     private final long startMillis;
@@ -51,14 +57,14 @@ public final class Process {
     
     private final int totalUnitCount;
     
-    private final Collection<Statement> processStatements;
-    
     private final AtomicInteger completedUnitCount;
     
     private final boolean idle;
     
     private final boolean heldByConnection;
     
+    private final AtomicBoolean interrupted;
+    
     public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext, final boolean heldByConnection) {
         this("", executionGroupContext, true, heldByConnection);
     }
@@ -76,10 +82,11 @@ public final class Process {
         username = null == grantee ? null : grantee.getUsername();
         hostname = null == grantee ? null : grantee.getHostname();
         totalUnitCount = getTotalUnitCount(executionGroupContext);
-        processStatements = getProcessStatements(executionGroupContext);
+        
processStatements.putAll(createProcessStatements(executionGroupContext));
         completedUnitCount = new AtomicInteger(0);
         this.idle = idle;
         this.heldByConnection = heldByConnection;
+        interrupted = new AtomicBoolean();
     }
     
     private int getTotalUnitCount(final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext) {
@@ -90,12 +97,13 @@ public final class Process {
         return result;
     }
     
-    private Collection<Statement> getProcessStatements(final 
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
-        Collection<Statement> result = new LinkedList<>();
+    private Map<ExecutionUnit, Statement> createProcessStatements(final 
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
+        Map<ExecutionUnit, Statement> result = new LinkedHashMap<>();
         for (ExecutionGroup<? extends SQLExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
             for (SQLExecutionUnit executionUnit : each.getInputs()) {
                 if (executionUnit instanceof JDBCExecutionUnit) {
-                    result.add(((JDBCExecutionUnit) 
executionUnit).getStorageResource());
+                    JDBCExecutionUnit jdbcExecutionUnit = (JDBCExecutionUnit) 
executionUnit;
+                    result.put(jdbcExecutionUnit.getExecutionUnit(), 
jdbcExecutionUnit.getStorageResource());
                 }
             }
         }
@@ -117,4 +125,13 @@ public final class Process {
     public int getCompletedUnitCount() {
         return completedUnitCount.get();
     }
+    
+    /**
+     * Get process statements.
+     * 
+     * @return process statements
+     */
+    public Collection<Statement> getProcessStatements() {
+        return processStatements.values();
+    }
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index 47dba29e7c6..2c33485f12a 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -47,4 +47,6 @@ public final class YamlProcess implements YamlConfiguration {
     private boolean idle;
     
     private boolean heldByConnection;
+    
+    private boolean interrupted;
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index e32f9e63b4e..d733859f855 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
-import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -42,12 +42,14 @@ public final class YamlProcessSwapper implements 
YamlConfigurationSwapper<YamlPr
         result.setCompletedUnitCount(data.getCompletedUnitCount());
         result.setIdle(data.isIdle());
         result.setHeldByConnection(data.isHeldByConnection());
+        result.setInterrupted(data.getInterrupted().get());
         return result;
     }
     
     @Override
     public Process swapToObject(final YamlProcess yamlConfig) {
         return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(), 
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(), 
yamlConfig.getHostname(),
-                yamlConfig.getTotalUnitCount(), Collections.emptyList(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(), 
yamlConfig.isHeldByConnection());
+                yamlConfig.getTotalUnitCount(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(), 
yamlConfig.isHeldByConnection(),
+                new AtomicBoolean(yamlConfig.isInterrupted()));
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
index f6aba3dd3d0..f927464bf73 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
@@ -40,7 +40,6 @@ import java.util.Collection;
  * TODO replace the old ProcessListChangedSubscriber after meta data refactor 
completed
  * New process list changed subscriber.
  */
-@SuppressWarnings("UnstableApiUsage")
 public final class NewProcessListChangedSubscriber {
     
     private final NewRegistryCenter registryCenter;
@@ -96,6 +95,7 @@ public final class NewProcessListChangedSubscriber {
         }
         Process process = 
ProcessRegistry.getInstance().get(event.getProcessId());
         if (null != process) {
+            process.getInterrupted().set(true);
             for (Statement each : process.getProcessStatements()) {
                 each.cancel();
             }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 42bf40f9ad7..18ac44232ec 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -27,8 +27,8 @@ import 
org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
 
@@ -39,7 +39,6 @@ import java.util.Collection;
 /**
  * Process list changed subscriber.
  */
-@SuppressWarnings("UnstableApiUsage")
 public final class ProcessListChangedSubscriber {
     
     private final RegistryCenter registryCenter;
@@ -95,6 +94,7 @@ public final class ProcessListChangedSubscriber {
         }
         Process process = 
ProcessRegistry.getInstance().get(event.getProcessId());
         if (null != process) {
+            process.getInterrupted().set(true);
             for (Statement each : process.getProcessStatements()) {
                 each.cancel();
             }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index e7adb0497c5..7dd15982013 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -108,11 +108,12 @@ class ProcessListChangedSubscriberTest {
     
     @Test
     void assertReportLocalProcesses() {
-        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
         Process process = mock(Process.class);
         String processId = "foo_id";
         when(process.getId()).thenReturn(processId);
+        when(process.getInterrupted()).thenReturn(new AtomicBoolean());
         ProcessRegistry.getInstance().add(process);
+        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
         subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent(instanceId, processId));
         ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
@@ -120,6 +121,7 @@ class ProcessListChangedSubscriberTest {
                         + "  heldByConnection: false" + System.lineSeparator()
                         + "  id: foo_id" + System.lineSeparator()
                         + "  idle: false" + System.lineSeparator()
+                        + "  interrupted: false" + System.lineSeparator()
                         + "  startMillis: 0" + System.lineSeparator()
                         + "  totalUnitCount: 0" + System.lineSeparator());
         
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" + 
instanceId + ":foo_id");
diff --git 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 158777ad094..04d8c60069c 100644
--- 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++ 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -33,6 +33,7 @@ import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -66,7 +67,7 @@ class ShowProcessListExecutorTest {
     
     private void setupProcesses(final ShowProcessListExecutor 
showProcessListExecutor) throws ReflectiveOperationException {
         Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80", 
1617939785160L,
-                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new 
AtomicInteger(1), false, false);
+                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, new AtomicInteger(1), false, false, new 
AtomicBoolean());
         Plugins.getMemberAccessor().set(
                 
showProcessListExecutor.getClass().getDeclaredField("processes"), 
showProcessListExecutor, Collections.singleton(process));
     }

Reply via email to