This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d26a383e57 Refactor show processlist and kill logic (#30075)
6d26a383e57 is described below

commit 6d26a383e5712ae738395667e90e4317f313840e
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Feb 8 16:53:59 2024 +0800

    Refactor show processlist and kill logic (#30075)
---
 .../user-manual/error-code/sql-error-code.cn.md    |  1 +
 .../user-manual/error-code/sql-error-code.en.md    |  1 +
 .../SQLExecutionInterruptedException.java          | 33 ++++++++++++++++++++++
 .../engine/driver/jdbc/JDBCExecutorCallback.java   |  2 +-
 .../raw/callback/RawSQLExecutorCallback.java       |  4 +--
 .../infra/executor/sql/process/Process.java        | 33 ++++++++++++++--------
 .../infra/executor/sql/process/ProcessEngine.java  | 13 +++++++--
 .../executor/sql/process/ProcessRegistry.java      | 27 ++++++++++++++++++
 .../process/yaml/swapper/YamlProcessSwapper.java   |  2 +-
 .../executor/sql/process/ProcessEngineTest.java    |  4 +--
 .../NewProcessListChangedSubscriber.java           |  2 +-
 .../subscriber/ProcessListChangedSubscriber.java   |  2 +-
 .../ProcessListChangedSubscriberTest.java          |  2 +-
 .../subscriber/StandaloneProcessSubscriber.java    |  2 +-
 14 files changed, 104 insertions(+), 24 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 2d8149b54d9..39d90ec239a 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -62,6 +62,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | 08000     | 13020       | Can not get %d connections one time, partition 
succeed connection(%d) have released. Please consider increasing the 
\`maxPoolSize\` of the data sources or decreasing the 
\`max-connections-size-per-query\` in properties. |
 | 08000     | 13030       | Connection has been closed.                        
                                                                                
                                                                                
         |
 | 08000     | 13031       | Result set has been closed.                        
                                                                                
                                                                                
         |
+| 08000     | 13032       | SQL execution has been interrupted.                
                                                                                
                                                                                
         |
 | HY000     | 13090       | Load datetime from database failed, reason: %s     
                                                                                
                                                                                
         |
 
 ### 事务
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index ee3f90f4c43..f4b6a98dc38 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -62,6 +62,7 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 | 08000     | 13020       | Can not get %d connections one time, partition 
succeed connection(%d) have released. Please consider increasing the 
\`maxPoolSize\` of the data sources or decreasing the 
\`max-connections-size-per-query\` in properties. |
 | 08000     | 13030       | Connection has been closed.                        
                                                                                
                                                                                
         |
 | 08000     | 13031       | Result set has been closed.                        
                                                                                
                                                                                
         |
+| 08000     | 13032       | SQL execution has been interrupted.                
                                                                                
                                                                                
         |
 | HY000     | 13090       | Load datetime from database failed, reason: %s     
                                                                                
                                                                                
         |
 
 ### Transaction
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/exception/SQLExecutionInterruptedException.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/exception/SQLExecutionInterruptedException.java
new file mode 100644
index 00000000000..b576f338ef8
--- /dev/null
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/exception/SQLExecutionInterruptedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.executor.exception;
+
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.ConnectionSQLException;
+
+/**
+ * SQL execution interrupted exception.
+ */
+public final class SQLExecutionInterruptedException extends 
ConnectionSQLException {
+    
+    private static final long serialVersionUID = 3394283296623445981L;
+    
+    public SQLExecutionInterruptedException() {
+        super(XOpenSQLState.CONNECTION_EXCEPTION, 32, "SQL execution has been 
interrupted.");
+    }
+}
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index e675e944a7f..bbdcbf5b1a2 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -82,7 +82,7 @@ public abstract class JDBCExecutorCallback<T> implements 
ExecutorCallback<JDBCEx
             
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(),
 sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
             T result = executeSQL(sqlUnit.getSql(), 
jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), 
storageType);
             sqlExecutionHook.finishSuccess();
-            processEngine.completeSQLUnitExecution();
+            processEngine.completeSQLUnitExecution(jdbcExecutionUnit);
             return result;
         } catch (final SQLException ex) {
             if (!storageType.equals(protocolType)) {
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 0baa68dcd8e..338962123f2 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -48,8 +48,8 @@ public final class RawSQLExecutorCallback implements 
ExecutorCallback<RawSQLExec
     public Collection<ExecuteResult> execute(final 
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws 
SQLException {
         Collection<ExecuteResult> result = 
callbacks.iterator().next().execute(inputs, isTrunkThread);
         if (!ProcessIdContext.isEmpty()) {
-            for (int i = 0; i < inputs.size(); i++) {
-                processEngine.completeSQLUnitExecution();
+            for (RawSQLExecutionUnit each : inputs) {
+                processEngine.completeSQLUnitExecution(each);
             }
         }
         return result;
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index 517cbb19186..8de45e49121 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 
 import java.sql.Statement;
-import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,8 +76,8 @@ public final class Process {
         this.sql = sql;
         databaseName = 
executionGroupContext.getReportContext().getDatabaseName();
         Grantee grantee = 
executionGroupContext.getReportContext().getGrantee();
-        username = null == grantee ? null : grantee.getUsername();
-        hostname = null == grantee ? null : grantee.getHostname();
+        username = null == grantee ? "" : grantee.getUsername();
+        hostname = null == grantee ? "" : grantee.getHostname();
         totalUnitCount = getTotalUnitCount(executionGroupContext);
         
processStatements.putAll(createProcessStatements(executionGroupContext));
         completedUnitCount = new AtomicInteger(0);
@@ -116,7 +115,7 @@ public final class Process {
     
     /**
      * Get completed unit count.
-     * 
+     *
      * @return completed unit count
      */
     public int getCompletedUnitCount() {
@@ -124,17 +123,17 @@ public final class Process {
     }
     
     /**
-     * Get interrupted.
+     * Is interrupted.
      *
      * @return interrupted
      */
-    public boolean getInterrupted() {
+    public boolean isInterrupted() {
         return interrupted.get();
     }
     
     /**
      * Set interrupted.
-     * 
+     *
      * @param interrupted interrupted
      */
     public void setInterrupted(final boolean interrupted) {
@@ -142,11 +141,21 @@ public final class Process {
     }
     
     /**
-     * Get process statements.
-     * 
-     * @return process statements
+     * Put process statement.
+     *
+     * @param executionUnit execution unit
+     * @param statement statement
+     */
+    public void putProcessStatement(final ExecutionUnit executionUnit, final 
Statement statement) {
+        processStatements.put(executionUnit, statement);
+    }
+    
+    /**
+     * Remove process statement.
+     *
+     * @param executionUnit execution unit
      */
-    public Collection<Statement> getProcessStatements() {
-        return processStatements.values();
+    public void removeProcessStatement(final ExecutionUnit executionUnit) {
+        processStatements.remove(executionUnit);
     }
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index a5d3e3ee48a..4a24993ae1b 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -44,6 +44,7 @@ public final class ProcessEngine {
      * @return process ID
      */
     public String connect(final Grantee grantee, final String databaseName) {
+        // TODO remove processId return value, and use ProcessIdContext.get() 
instead
         String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
         ProcessIdContext.set(processId);
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext =
@@ -59,6 +60,7 @@ public final class ProcessEngine {
      * @param processId process ID
      */
     public void disconnect(final String processId) {
+        // TODO remove processId parameter, and use ProcessIdContext.get() 
instead
         ProcessRegistry.getInstance().remove(processId);
         ProcessIdContext.remove();
     }
@@ -77,12 +79,19 @@ public final class ProcessEngine {
     
     /**
      * Complete SQL unit execution.
+     * 
+     * @param executionUnit execution unit
      */
-    public void completeSQLUnitExecution() {
+    public void completeSQLUnitExecution(final SQLExecutionUnit executionUnit) 
{
         if (ProcessIdContext.isEmpty()) {
             return;
         }
-        
ProcessRegistry.getInstance().get(ProcessIdContext.get()).completeExecutionUnit();
+        Process process = 
ProcessRegistry.getInstance().get(ProcessIdContext.get());
+        if (null == process) {
+            return;
+        }
+        process.completeExecutionUnit();
+        process.removeProcessStatement(executionUnit.getExecutionUnit());
     }
     
     /**
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
index 0e569e24753..fcad368fa8c 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
@@ -17,12 +17,17 @@
 
 package org.apache.shardingsphere.infra.executor.sql.process;
 
+import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.executor.exception.SQLExecutionInterruptedException;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Process registry.
@@ -49,6 +54,28 @@ public final class ProcessRegistry {
      * @param process process
      */
     public void add(final Process process) {
+        if (isSameExecutionProcess(process)) {
+            Process oldProcess = processes.get(process.getId());
+            
ShardingSpherePreconditions.checkState(!oldProcess.isInterrupted(), 
SQLExecutionInterruptedException::new);
+            merge(oldProcess, process);
+            return;
+        }
+        processes.put(process.getId(), process);
+    }
+    
+    private boolean isSameExecutionProcess(final Process process) {
+        return !Strings.isNullOrEmpty(process.getSql()) && 
processes.containsKey(process.getId()) && 
processes.get(process.getId()).getSql().equalsIgnoreCase(process.getSql());
+    }
+    
+    private void merge(final Process oldProcess, final Process newProcess) {
+        int totalUnitCount = oldProcess.getTotalUnitCount() + 
newProcess.getTotalUnitCount();
+        int completedUnitCount = oldProcess.getCompletedUnitCount() + 
newProcess.getCompletedUnitCount();
+        boolean idle = oldProcess.isIdle() || newProcess.isIdle();
+        boolean interrupted = oldProcess.isInterrupted() || 
newProcess.isInterrupted();
+        Process process = new Process(oldProcess.getId(), 
oldProcess.getStartMillis(), oldProcess.getSql(), oldProcess.getDatabaseName(),
+                oldProcess.getUsername(), oldProcess.getHostname(), 
totalUnitCount, new AtomicInteger(completedUnitCount), idle, new 
AtomicBoolean(interrupted));
+        
oldProcess.getProcessStatements().forEach(process::putProcessStatement);
+        
newProcess.getProcessStatements().forEach(process::putProcessStatement);
         processes.put(process.getId(), process);
     }
     
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index 9cfa079314d..b6cc81e4a9b 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -41,7 +41,7 @@ public final class YamlProcessSwapper implements 
YamlConfigurationSwapper<YamlPr
         result.setTotalUnitCount(data.getTotalUnitCount());
         result.setCompletedUnitCount(data.getCompletedUnitCount());
         result.setIdle(data.isIdle());
-        result.setInterrupted(data.getInterrupted());
+        result.setInterrupted(data.isInterrupted());
         return result;
     }
     
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index e246bf0e30b..f3eb13cc104 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.infra.executor.sql.process;
 
-import org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.UpdateStatementContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
@@ -81,7 +81,7 @@ class ProcessEngineTest {
     void assertCompleteSQLUnitExecution() {
         ProcessIdContext.set("foo_id");
         when(processRegistry.get("foo_id")).thenReturn(mock(Process.class));
-        new ProcessEngine().completeSQLUnitExecution();
+        new 
ProcessEngine().completeSQLUnitExecution(mock(SQLExecutionUnit.class));
         verify(processRegistry).get("foo_id");
         ProcessIdContext.remove();
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
index 64cd1d938a5..df07fd4b769 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/NewProcessListChangedSubscriber.java
@@ -96,7 +96,7 @@ public final class NewProcessListChangedSubscriber {
         Process process = 
ProcessRegistry.getInstance().get(event.getProcessId());
         if (null != process) {
             process.setInterrupted(true);
-            for (Statement each : process.getProcessStatements()) {
+            for (Statement each : process.getProcessStatements().values()) {
                 each.cancel();
             }
         }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 6efd5045f0c..4c094f18a61 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -95,7 +95,7 @@ public final class ProcessListChangedSubscriber {
         Process process = 
ProcessRegistry.getInstance().get(event.getProcessId());
         if (null != process) {
             process.setInterrupted(true);
-            for (Statement each : process.getProcessStatements()) {
+            for (Statement each : process.getProcessStatements().values()) {
                 each.cancel();
             }
         }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 6f38546e90e..a0ddbfaf5f1 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -111,7 +111,7 @@ class ProcessListChangedSubscriberTest {
         Process process = mock(Process.class);
         String processId = "foo_id";
         when(process.getId()).thenReturn(processId);
-        when(process.getInterrupted()).thenReturn(false);
+        when(process.isInterrupted()).thenReturn(false);
         ProcessRegistry.getInstance().add(process);
         String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
         subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent(instanceId, processId));
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
index 2063cfb6bea..88033a0dd9a 100644
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
+++ 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
@@ -55,7 +55,7 @@ public final class StandaloneProcessSubscriber implements 
ProcessSubscriber {
         if (null == process) {
             return;
         }
-        for (Statement each : process.getProcessStatements()) {
+        for (Statement each : process.getProcessStatements().values()) {
             each.cancel();
         }
     }

Reply via email to