This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5813555672f Support show processlist and kill for sql federation 
(#30385)
5813555672f is described below

commit 5813555672f4f4b6a75d27e9824e2930d0e7b09c
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Mon Mar 4 17:11:55 2024 +0800

    Support show processlist and kill for sql federation (#30385)
---
 .../sqlfederation/engine/SQLFederationEngine.java  | 29 +++++++++++++++-------
 .../enumerable/EnumerableScanExecutor.java         | 11 ++++----
 2 files changed, 26 insertions(+), 14 deletions(-)

diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
index 77e6073c33f..624b07c19f9 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -77,6 +78,8 @@ public final class SQLFederationEngine implements 
AutoCloseable {
     
     private static final int DEFAULT_METADATA_VERSION = 0;
     
+    private final ProcessEngine processEngine = new ProcessEngine();
+    
     @SuppressWarnings("rawtypes")
     private final Map<ShardingSphereRule, SQLFederationDecider> deciders;
     
@@ -153,20 +156,12 @@ public final class SQLFederationEngine implements 
AutoCloseable {
      * @param federationContext federation context
      * @return result set
      */
-    @SuppressWarnings("unchecked")
     public ResultSet executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                                   final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final SQLFederationExecutorContext federationContext) {
         String databaseName = 
federationContext.getQueryContext().getDatabaseNameFromSQLStatement().orElse(this.databaseName);
         String schemaName = 
federationContext.getQueryContext().getSchemaNameFromSQLStatement().orElse(this.schemaName);
         SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine, 
callback, federationContext, databaseName, schemaName);
-        Bindable<Object> executablePlan = 
EnumerableInterpretable.toBindable(Collections.emptyMap(), null, 
(EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
-        Map<String, Object> params = 
createParameters(federationContext.getQueryContext().getParameters());
-        OptimizerPlannerContext plannerContext = 
sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
-        Enumerator<Object> enumerator = executablePlan.bind(new 
SQLFederationDataContext(plannerContext.getValidator(schemaName), 
plannerContext.getConverter(schemaName), params)).enumerator();
-        ShardingSphereSchema schema = 
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
-        Schema sqlFederationSchema = 
plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
-        resultSet = new SQLFederationResultSet(enumerator, schema, 
sqlFederationSchema, (SelectStatementContext) 
federationContext.getQueryContext().getSqlStatementContext(),
-                executionPlan.getResultColumnType());
+        resultSet = executePlan(federationContext, executionPlan, 
databaseName, schemaName);
         return resultSet;
     }
     
@@ -185,6 +180,22 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         return compilerEngine.compile(buildCacheKey(federationContext, 
selectStatementContext, sqlStatementCompiler, databaseName, schemaName), false);
     }
     
+    @SuppressWarnings("unchecked")
+    private ResultSet executePlan(final SQLFederationExecutorContext 
federationContext, final SQLFederationExecutionPlan executionPlan, final String 
databaseName, final String schemaName) {
+        try {
+            Bindable<Object> executablePlan = 
EnumerableInterpretable.toBindable(Collections.emptyMap(), null, 
(EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
+            Map<String, Object> params = 
createParameters(federationContext.getQueryContext().getParameters());
+            OptimizerPlannerContext plannerContext = 
sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
+            Enumerator<Object> enumerator = executablePlan.bind(new 
SQLFederationDataContext(plannerContext.getValidator(schemaName), 
plannerContext.getConverter(schemaName), params)).enumerator();
+            ShardingSphereSchema schema = 
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
+            Schema sqlFederationSchema = 
plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
+            return new SQLFederationResultSet(enumerator, schema, 
sqlFederationSchema, (SelectStatementContext) 
federationContext.getQueryContext().getSqlStatementContext(),
+                    executionPlan.getResultColumnType());
+        } finally {
+            
processEngine.completeSQLExecution(federationContext.getProcessId());
+        }
+    }
+    
     private ExecutionPlanCacheKey buildCacheKey(final 
SQLFederationExecutorContext federationContext, final SelectStatementContext 
selectStatementContext,
                                                 final SQLStatementCompiler 
sqlStatementCompiler, final String databaseName, final String schemaName) {
         ShardingSphereSchema schema = 
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index 0e06541e85c..d5937649280 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -30,6 +30,8 @@ import 
org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.system.SystemDatabase;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.executor.exception.SQLExecutionInterruptedException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
@@ -42,6 +44,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
@@ -138,11 +141,7 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
             
federationContext.getPreviewExecutionUnits().addAll(context.getExecutionUnits());
             return createEmptyEnumerable();
         }
-        try {
-            return createEnumerable(queryContext, database, context);
-        } finally {
-            
processEngine.completeSQLExecution(federationContext.getProcessId());
-        }
+        return createEnumerable(queryContext, database, context);
     }
     
     private AbstractEnumerable<Object> createEnumerable(final QueryContext 
queryContext, final ShardingSphereDatabase database, final ExecutionContext 
context) {
@@ -156,6 +155,8 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
                 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext 
= prepareEngine.prepare(context.getRouteContext(), 
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
                         new 
ExecutionGroupReportContext(executorContext.getFederationContext().getProcessId(),
 database.getName(), new Grantee("", "")));
                 setParameters(executionGroupContext.getInputGroups());
+                
ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(executorContext.getFederationContext().getProcessId()).isInterrupted(),
+                        SQLExecutionInterruptedException::new);
                 processEngine.executeSQL(executionGroupContext, 
executorContext.getFederationContext().getQueryContext());
                 List<QueryResult> queryResults = 
jdbcExecutor.execute(executionGroupContext, 
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
                 MergeEngine mergeEngine = new MergeEngine(database, 
executorContext.getProps(), new ConnectionContext());

Reply via email to