This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5813555672f Support show processlist and kill for sql federation
(#30385)
5813555672f is described below
commit 5813555672f4f4b6a75d27e9824e2930d0e7b09c
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Mon Mar 4 17:11:55 2024 +0800
Support show processlist and kill for sql federation (#30385)
---
.../sqlfederation/engine/SQLFederationEngine.java | 29 +++++++++++++++-------
.../enumerable/EnumerableScanExecutor.java | 11 ++++----
2 files changed, 26 insertions(+), 14 deletions(-)
diff --git
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
index 77e6073c33f..624b07c19f9 100644
---
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
+++
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -77,6 +78,8 @@ public final class SQLFederationEngine implements
AutoCloseable {
private static final int DEFAULT_METADATA_VERSION = 0;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
@SuppressWarnings("rawtypes")
private final Map<ShardingSphereRule, SQLFederationDecider> deciders;
@@ -153,20 +156,12 @@ public final class SQLFederationEngine implements
AutoCloseable {
* @param federationContext federation context
* @return result set
*/
- @SuppressWarnings("unchecked")
public ResultSet executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final JDBCExecutorCallback<? extends
ExecuteResult> callback, final SQLFederationExecutorContext federationContext) {
String databaseName =
federationContext.getQueryContext().getDatabaseNameFromSQLStatement().orElse(this.databaseName);
String schemaName =
federationContext.getQueryContext().getSchemaNameFromSQLStatement().orElse(this.schemaName);
SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine,
callback, federationContext, databaseName, schemaName);
- Bindable<Object> executablePlan =
EnumerableInterpretable.toBindable(Collections.emptyMap(), null,
(EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
- Map<String, Object> params =
createParameters(federationContext.getQueryContext().getParameters());
- OptimizerPlannerContext plannerContext =
sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
- Enumerator<Object> enumerator = executablePlan.bind(new
SQLFederationDataContext(plannerContext.getValidator(schemaName),
plannerContext.getConverter(schemaName), params)).enumerator();
- ShardingSphereSchema schema =
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
- Schema sqlFederationSchema =
plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
- resultSet = new SQLFederationResultSet(enumerator, schema,
sqlFederationSchema, (SelectStatementContext)
federationContext.getQueryContext().getSqlStatementContext(),
- executionPlan.getResultColumnType());
+ resultSet = executePlan(federationContext, executionPlan,
databaseName, schemaName);
return resultSet;
}
@@ -185,6 +180,22 @@ public final class SQLFederationEngine implements
AutoCloseable {
return compilerEngine.compile(buildCacheKey(federationContext,
selectStatementContext, sqlStatementCompiler, databaseName, schemaName), false);
}
+ @SuppressWarnings("unchecked")
+ private ResultSet executePlan(final SQLFederationExecutorContext
federationContext, final SQLFederationExecutionPlan executionPlan, final String
databaseName, final String schemaName) {
+ try {
+ Bindable<Object> executablePlan =
EnumerableInterpretable.toBindable(Collections.emptyMap(), null,
(EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
+ Map<String, Object> params =
createParameters(federationContext.getQueryContext().getParameters());
+ OptimizerPlannerContext plannerContext =
sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
+ Enumerator<Object> enumerator = executablePlan.bind(new
SQLFederationDataContext(plannerContext.getValidator(schemaName),
plannerContext.getConverter(schemaName), params)).enumerator();
+ ShardingSphereSchema schema =
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
+ Schema sqlFederationSchema =
plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
+ return new SQLFederationResultSet(enumerator, schema,
sqlFederationSchema, (SelectStatementContext)
federationContext.getQueryContext().getSqlStatementContext(),
+ executionPlan.getResultColumnType());
+ } finally {
+
processEngine.completeSQLExecution(federationContext.getProcessId());
+ }
+ }
+
private ExecutionPlanCacheKey buildCacheKey(final
SQLFederationExecutorContext federationContext, final SelectStatementContext
selectStatementContext,
final SQLStatementCompiler
sqlStatementCompiler, final String databaseName, final String schemaName) {
ShardingSphereSchema schema =
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index 0e06541e85c..d5937649280 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -30,6 +30,8 @@ import
org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import
org.apache.shardingsphere.infra.database.core.metadata.database.system.SystemDatabase;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.executor.exception.SQLExecutionInterruptedException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
@@ -42,6 +44,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
@@ -138,11 +141,7 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
federationContext.getPreviewExecutionUnits().addAll(context.getExecutionUnits());
return createEmptyEnumerable();
}
- try {
- return createEnumerable(queryContext, database, context);
- } finally {
-
processEngine.completeSQLExecution(federationContext.getProcessId());
- }
+ return createEnumerable(queryContext, database, context);
}
private AbstractEnumerable<Object> createEnumerable(final QueryContext
queryContext, final ShardingSphereDatabase database, final ExecutionContext
context) {
@@ -156,6 +155,8 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext
= prepareEngine.prepare(context.getRouteContext(),
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
new
ExecutionGroupReportContext(executorContext.getFederationContext().getProcessId(),
database.getName(), new Grantee("", "")));
setParameters(executionGroupContext.getInputGroups());
+
ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(executorContext.getFederationContext().getProcessId()).isInterrupted(),
+ SQLExecutionInterruptedException::new);
processEngine.executeSQL(executionGroupContext,
executorContext.getFederationContext().getQueryContext());
List<QueryResult> queryResults =
jdbcExecutor.execute(executionGroupContext,
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());