This is an automated email from the ASF dual-hosted git repository. duanzhengqiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 344c41d9c43 Add new SQLFederationRelConverter to simplify engine logic and refactor sql federation executor package (#35483) 344c41d9c43 is described below commit 344c41d9c4347bab1cd2aa36f2920ef8156996a8 Author: Zhengqiang Duan <duanzhengqi...@apache.org> AuthorDate: Thu May 22 17:15:08 2025 +0800 Add new SQLFederationRelConverter to simplify engine logic and refactor sql federation executor package (#35483) * Add new SQLFederationRelConverter to simplify engine logic * move LogicalScanPushDownRelBuilder and LogicalScanRelRewriter to rel package * refactor sql federation executor context * refactor sql federation executor package * fix checkstyle --- .../executor/engine/DriverExecuteExecutor.java | 2 +- .../engine/DriverExecuteQueryExecutor.java | 2 +- .../compiler/compiler/SQLStatementCompiler.java | 21 ++-- .../ScanImplementor.java} | 10 +- .../ScanImplementorContext.java} | 6 +- .../enumerator/EmptyDataRowEnumerator.java} | 6 +- .../metadata/schema/SQLFederationTable.java | 22 ++-- .../builder}/LogicalScanPushDownRelBuilder.java | 2 +- .../converter/SQLFederationRelConverter.java} | 137 +++++++++++---------- .../rewriter/LogicalScanRelRewriter.java} | 14 +-- .../compiler/sql/operator/logical/LogicalScan.java | 2 +- .../sql/operator/physical/EnumerableScan.java | 2 +- .../compiler/it/SQLStatementCompilerIT.java | 32 ++--- .../context/SQLFederationContext.java | 2 +- .../sqlfederation/engine/SQLFederationEngine.java | 51 +++----- .../engine/processor/SQLFederationProcessor.java | 6 +- .../processor/SQLFederationProcessorFactory.java | 6 +- .../impl/StandardSQLFederationProcessor.java | 36 +++--- ...onBindContext.java => ExecutorBindContext.java} | 10 +- .../executor/context/ExecutorContext.java | 65 ++++++++++ .../context/SQLFederationExecutorContext.java | 41 ------ .../enumerator/jdbc/JDBCDataRowEnumerator.java} | 6 +- .../memory/MemoryDataRowEnumerator.java} | 13 +- .../memory/MemoryDataTypeConverter.java} | 4 +- .../memory/MemoryTableStatisticsBuilder.java} | 56 +++++---- .../EnumerableScanImplementor.java} | 96 ++++++--------- .../EnumerableScanImplementorTest.java} | 44 ++++--- .../connector/StandardDatabaseConnector.java | 2 +- .../handler/distsql/rul/PreviewExecutor.java | 2 +- .../OpenGaussSystemCatalogAdminQueryExecutor.java | 2 +- 30 files changed, 352 insertions(+), 348 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java index 0de94186204..5713d36a04c 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java @@ -40,7 +40,7 @@ import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.mode.metadata.refresher.federation.FederationMetaDataRefreshEngine; import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import java.sql.Connection; import java.sql.ResultSet; diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java index e5becef2394..f8a66409307 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java @@ -32,7 +32,7 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import java.sql.Connection; import java.sql.ResultSet; diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java index e30f4b2d6ba..c34e99c5606 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java @@ -25,14 +25,12 @@ import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQueryBase; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan; import org.apache.shardingsphere.sqlfederation.compiler.planner.builder.SQLFederationPlannerBuilder; +import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter; +import org.apache.shardingsphere.sqlfederation.compiler.rel.rewriter.LogicalScanRelRewriter; import org.apache.shardingsphere.sqlfederation.compiler.sql.ast.converter.SQLNodeConverterEngine; -import org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util.LogicalScanRelShuttle; - -import java.util.Objects; /** * SQL statement compiler. @@ -40,7 +38,7 @@ import java.util.Objects; @RequiredArgsConstructor public final class SQLStatementCompiler { - private final SqlToRelConverter converter; + private final SQLFederationRelConverter converter; private final Convention convention; @@ -55,20 +53,21 @@ public final class SQLStatementCompiler { RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT); SqlNode sqlNode = SQLNodeConverterEngine.convert(sqlStatement); RelNode logicalPlan = converter.convertQuery(sqlNode, true, true).rel; - RelDataType resultColumnType = Objects.requireNonNull(converter.validator).getValidatedNodeType(sqlNode); - RelNode replacePlan = LogicalScanRelShuttle.replace(logicalPlan, databaseType); - RelNode rewritePlan = rewrite(replacePlan, SQLFederationPlannerBuilder.buildHepPlanner()); + RelDataType resultColumnType = converter.getValidatedNodeType(sqlNode); + RelNode rewritePlan = rewrite(logicalPlan, databaseType); RelNode physicalPlan = optimize(rewritePlan, converter); RelMetadataQueryBase.THREAD_PROVIDERS.remove(); return new SQLFederationExecutionPlan(physicalPlan, resultColumnType); } - private RelNode rewrite(final RelNode logicalPlan, final RelOptPlanner hepPlanner) { - hepPlanner.setRoot(logicalPlan); + private RelNode rewrite(final RelNode logicalPlan, final String databaseType) { + RelNode rewrittenPlan = LogicalScanRelRewriter.rewrite(logicalPlan, databaseType); + RelOptPlanner hepPlanner = SQLFederationPlannerBuilder.buildHepPlanner(); + hepPlanner.setRoot(rewrittenPlan); return hepPlanner.findBestExp(); } - private RelNode optimize(final RelNode rewritePlan, final SqlToRelConverter converter) { + private RelNode optimize(final RelNode rewritePlan, final SQLFederationRelConverter converter) { RelOptPlanner planner = converter.getCluster().getPlanner(); if (rewritePlan.getTraitSet().equals(converter.getCluster().traitSet().replace(convention))) { planner.setRoot(rewritePlan); diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutor.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementor.java similarity index 81% rename from kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutor.java rename to kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementor.java index 3fe845b6871..95624c4b0db 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutor.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementor.java @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table; +package org.apache.shardingsphere.sqlfederation.compiler.implementor; import org.apache.calcite.linq4j.Enumerable; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; /** - * Scan executor interface. + * Scan implementor interface. */ -public interface ScanExecutor { +public interface ScanImplementor { /** - * Execute. + * Implement. * * @param table table meta data * @param scanContext push down table scan context * @return query results */ - Enumerable<Object> execute(ShardingSphereTable table, ScanExecutorContext scanContext); + Enumerable<Object> implement(ShardingSphereTable table, ScanImplementorContext scanContext); } diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutorContext.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementorContext.java similarity index 87% rename from kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutorContext.java rename to kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementorContext.java index f3f9e44b2ac..01d9773d670 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutorContext.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementorContext.java @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table; +package org.apache.shardingsphere.sqlfederation.compiler.implementor; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.calcite.DataContext; /** - * Scan executor context. + * Scan implementor context. */ @RequiredArgsConstructor @Getter -public final class ScanExecutorContext { +public final class ScanImplementorContext { private final DataContext root; diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/EmptyRowEnumerator.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/enumerator/EmptyDataRowEnumerator.java similarity index 85% rename from kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/EmptyRowEnumerator.java rename to kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/enumerator/EmptyDataRowEnumerator.java index e4c6b65ff6e..f4426f56162 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/EmptyRowEnumerator.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/enumerator/EmptyDataRowEnumerator.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table; +package org.apache.shardingsphere.sqlfederation.compiler.implementor.enumerator; import org.apache.calcite.linq4j.Enumerator; /** - * Empty row enumerator. + * Empty data row enumerator. */ -public final class EmptyRowEnumerator implements Enumerator<Object> { +public final class EmptyDataRowEnumerator implements Enumerator<Object> { @Override public Object current() { diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java index 3a990e7f7e6..547d721a55c 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java @@ -46,9 +46,9 @@ import org.apache.calcite.schema.impl.AbstractTable; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.EmptyRowEnumerator; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutor; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutorContext; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementor; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.enumerator.EmptyDataRowEnumerator; import org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeBuilder; import java.lang.reflect.Type; @@ -62,7 +62,7 @@ import java.util.List; @RequiredArgsConstructor public final class SQLFederationTable extends AbstractTable implements ModifiableTable, TranslatableTable { - private static final TransmittableThreadLocal<ScanExecutor> SCAN_EXECUTOR_HOLDER = new TransmittableThreadLocal<>(); + private static final TransmittableThreadLocal<ScanImplementor> SCAN_EXECUTOR_HOLDER = new TransmittableThreadLocal<>(); private final ShardingSphereTable table; @@ -94,18 +94,18 @@ public final class SQLFederationTable extends AbstractTable implements Modifiabl } /** - * Execute. + * Implement. * * @param root data context * @param sql sql * @param paramIndexes param indexes * @return enumerable result */ - public Enumerable<Object> execute(final DataContext root, final String sql, final int[] paramIndexes) { + public Enumerable<Object> implement(final DataContext root, final String sql, final int[] paramIndexes) { if (null == SCAN_EXECUTOR_HOLDER.get()) { return createEmptyEnumerable(); } - return SCAN_EXECUTOR_HOLDER.get().execute(table, new ScanExecutorContext(root, sql, paramIndexes)); + return SCAN_EXECUTOR_HOLDER.get().implement(table, new ScanImplementorContext(root, sql, paramIndexes)); } private AbstractEnumerable<Object> createEmptyEnumerable() { @@ -113,7 +113,7 @@ public final class SQLFederationTable extends AbstractTable implements Modifiabl @Override public Enumerator<Object> enumerator() { - return new EmptyRowEnumerator(); + return new EmptyDataRowEnumerator(); } }; } @@ -138,10 +138,10 @@ public final class SQLFederationTable extends AbstractTable implements Modifiabl /** * Set scan executor. * - * @param scanExecutor scan executor + * @param scanImplementor scan executor */ - public void setScanExecutor(final ScanExecutor scanExecutor) { - SCAN_EXECUTOR_HOLDER.set(scanExecutor); + public void setScanExecutor(final ScanImplementor scanImplementor) { + SCAN_EXECUTOR_HOLDER.set(scanImplementor); } /** diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanPushDownRelBuilder.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/builder/LogicalScanPushDownRelBuilder.java similarity index 95% rename from kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanPushDownRelBuilder.java rename to kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/builder/LogicalScanPushDownRelBuilder.java index a63dce7ad8e..37a27201f7e 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanPushDownRelBuilder.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/builder/LogicalScanPushDownRelBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util; +package org.apache.shardingsphere.sqlfederation.compiler.rel.builder; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/util/SQLFederationValidatorUtils.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/converter/SQLFederationRelConverter.java similarity index 55% rename from kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/util/SQLFederationValidatorUtils.java rename to kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/converter/SQLFederationRelConverter.java index 742b998c3ed..5f4777390d0 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/util/SQLFederationValidatorUtils.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/converter/SQLFederationRelConverter.java @@ -15,20 +15,21 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.compiler.metadata.util; +package org.apache.shardingsphere.sqlfederation.compiler.rel.converter; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; +import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptTable.ViewExpander; import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.fun.SqlLibrary; import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory; @@ -38,25 +39,25 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.calcite.sql2rel.SqlToRelConverter.Config; import org.apache.calcite.sql2rel.StandardConvertletTable; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.parser.rule.SQLParserRule; -import org.apache.shardingsphere.sqlfederation.compiler.sql.function.SQLFederationFunctionRegister; -import org.apache.shardingsphere.sqlfederation.compiler.sql.function.mysql.MySQLOperatorTable; +import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; import org.apache.shardingsphere.sqlfederation.compiler.metadata.catalog.SQLFederationCatalogReader; +import org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeFactory; import org.apache.shardingsphere.sqlfederation.compiler.metadata.view.ShardingSphereViewExpander; import org.apache.shardingsphere.sqlfederation.compiler.planner.builder.SQLFederationPlannerBuilder; +import org.apache.shardingsphere.sqlfederation.compiler.sql.function.mysql.MySQLOperatorTable; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; /** - * SQL federation validator utility class. + * SQL federation rel converter. */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class SQLFederationValidatorUtils { +public final class SQLFederationRelConverter { private static final Map<String, SqlLibrary> DATABASE_TYPE_SQL_LIBRARIES = new HashMap<>(); @@ -67,35 +68,19 @@ public final class SQLFederationValidatorUtils { DATABASE_TYPE_SQL_LIBRARIES.put("Oracle", SqlLibrary.ORACLE); } - /** - * Create catalog reader. - * - * @param schemaName schema name - * @param schema schema - * @param relDataTypeFactory rel data type factory - * @param connectionConfig connection config - * @param databaseType database type - * @return calcite catalog reader - */ - public static CalciteCatalogReader createCatalogReader(final String schemaName, final Schema schema, final RelDataTypeFactory relDataTypeFactory, final CalciteConnectionConfig connectionConfig, - final DatabaseType databaseType) { - CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); - rootSchema.add(schemaName, schema); - DatabaseTypedSPILoader.findService(SQLFederationFunctionRegister.class, databaseType).ifPresent(optional -> optional.registerFunction(rootSchema.plus(), schemaName)); - return new SQLFederationCatalogReader(rootSchema, Collections.singletonList(schemaName), relDataTypeFactory, connectionConfig); + private final SqlToRelConverter sqlToRelConverter; + + public SQLFederationRelConverter(final CompilerContext compilerContext, final List<String> schemaPath, final DatabaseType databaseType, final Convention convention) { + JavaTypeFactory typeFactory = SQLFederationDataTypeFactory.getInstance(); + CalciteConnectionConfig connectionConfig = compilerContext.getConnectionConfig(); + CalciteCatalogReader catalogReader = new SQLFederationCatalogReader(compilerContext.getCalciteSchema(), schemaPath, typeFactory, connectionConfig); + SqlValidator validator = createSqlValidator(catalogReader, typeFactory, databaseType, connectionConfig); + RelOptCluster relOptCluster = createRelOptCluster(typeFactory, convention); + sqlToRelConverter = createSqlToRelConverter(catalogReader, validator, relOptCluster, compilerContext.getSqlParserRule(), databaseType, true); } - /** - * Create sql validator. - * - * @param catalogReader catalog reader - * @param relDataTypeFactory rel data type factory - * @param databaseType database type - * @param connectionConfig connection config - * @return sql validator - */ - public static SqlValidator createSqlValidator(final CalciteCatalogReader catalogReader, final RelDataTypeFactory relDataTypeFactory, final DatabaseType databaseType, - final CalciteConnectionConfig connectionConfig) { + private SqlValidator createSqlValidator(final CalciteCatalogReader catalogReader, final RelDataTypeFactory relDataTypeFactory, final DatabaseType databaseType, + final CalciteConnectionConfig connectionConfig) { SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT.withLenientOperatorLookup(connectionConfig.lenientOperatorLookup()).withConformance(connectionConfig.conformance()) .withDefaultNullCollation(connectionConfig.defaultNullCollation()).withIdentifierExpansion(true); SqlOperatorTable sqlOperatorTable = getSQLOperatorTable(catalogReader, databaseType.getTrunkDatabaseType().orElse(databaseType)); @@ -103,41 +88,63 @@ public final class SQLFederationValidatorUtils { } private static SqlOperatorTable getSQLOperatorTable(final CalciteCatalogReader catalogReader, final DatabaseType databaseType) { - return SqlOperatorTables.chain(Arrays.asList(new MySQLOperatorTable(), - SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(Arrays.asList(SqlLibrary.STANDARD, DATABASE_TYPE_SQL_LIBRARIES.getOrDefault(databaseType.getType(), SqlLibrary.MYSQL))), - catalogReader)); + SqlOperatorTable operatorTable = + SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(Arrays.asList(SqlLibrary.STANDARD, DATABASE_TYPE_SQL_LIBRARIES.getOrDefault(databaseType.getType(), SqlLibrary.MYSQL))); + return SqlOperatorTables.chain(Arrays.asList(new MySQLOperatorTable(), operatorTable, catalogReader)); } - /** - * Create sql to rel converter. - * - * @param catalogReader catalog reader - * @param validator validator - * @param cluster cluster - * @param sqlParserRule sql parser rule - * @param databaseType database type - * @param needsViewExpand whether sql needs view expand or not - * @return sql to rel converter - */ - public static SqlToRelConverter createSqlToRelConverter(final CalciteCatalogReader catalogReader, final SqlValidator validator, final RelOptCluster cluster, final SQLParserRule sqlParserRule, - final DatabaseType databaseType, final boolean needsViewExpand) { - ViewExpander expander = - needsViewExpand ? new ShardingSphereViewExpander(sqlParserRule, databaseType, createSqlToRelConverter(catalogReader, validator, cluster, sqlParserRule, databaseType, false)) - : (rowType, queryString, schemaPath, viewPath) -> null; + private SqlToRelConverter createSqlToRelConverter(final CalciteCatalogReader catalogReader, final SqlValidator validator, final RelOptCluster cluster, final SQLParserRule sqlParserRule, + final DatabaseType databaseType, final boolean needsViewExpand) { + ViewExpander expander = needsViewExpand + ? new ShardingSphereViewExpander(sqlParserRule, databaseType, createSqlToRelConverter(catalogReader, validator, cluster, sqlParserRule, databaseType, false)) + : (rowType, queryString, schemaPath, viewPath) -> null; // TODO remove withRemoveSortInSubQuery when calcite can expand view which contains order by correctly Config converterConfig = SqlToRelConverter.config().withTrimUnusedFields(true).withRemoveSortInSubQuery(false).withExpand(true); return new SqlToRelConverter(expander, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, converterConfig); } + private RelOptCluster createRelOptCluster(final RelDataTypeFactory relDataTypeFactory, final Convention convention) { + RelOptPlanner volcanoPlanner = SQLFederationPlannerBuilder.buildVolcanoPlanner(convention); + return RelOptCluster.create(volcanoPlanner, new RexBuilder(relDataTypeFactory)); + } + /** - * Create rel opt cluster. + * Get schema plus. * - * @param relDataTypeFactory rel data type factory - * @param convention convention - * @return rel opt cluster + * @return schema plus */ - public static RelOptCluster createRelOptCluster(final RelDataTypeFactory relDataTypeFactory, final Convention convention) { - RelOptPlanner volcanoPlanner = SQLFederationPlannerBuilder.buildVolcanoPlanner(convention); - return RelOptCluster.create(volcanoPlanner, new RexBuilder(relDataTypeFactory)); + public SchemaPlus getSchemaPlus() { + return sqlToRelConverter.validator.getCatalogReader().getRootSchema().plus(); + } + + /** + * Convert query. + * + * @param sqlNode sql node + * @param needsValidation need validation + * @param top top + * @return rel root + */ + public RelRoot convertQuery(final SqlNode sqlNode, final boolean needsValidation, final boolean top) { + return sqlToRelConverter.convertQuery(sqlNode, needsValidation, top); + } + + /** + * Get validated node type. + * + * @param sqlNode sql node + * @return rel data type + */ + public RelDataType getValidatedNodeType(final SqlNode sqlNode) { + return Objects.requireNonNull(sqlToRelConverter.validator).getValidatedNodeType(sqlNode); + } + + /** + * Get cluster. + * + * @return cluster + */ + public RelOptCluster getCluster() { + return sqlToRelConverter.getCluster(); } } diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanRelShuttle.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/rewriter/LogicalScanRelRewriter.java similarity index 79% rename from kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanRelShuttle.java rename to kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/rewriter/LogicalScanRelRewriter.java index 0d99e240a4c..4c024759a58 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanRelShuttle.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/rewriter/LogicalScanRelRewriter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util; +package org.apache.shardingsphere.sqlfederation.compiler.rel.rewriter; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; @@ -25,10 +25,10 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.shardingsphere.sqlfederation.compiler.sql.operator.logical.LogicalScan; /** - * Logical scan rel shuttle. + * Logical scan rel rewriter. */ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) -public final class LogicalScanRelShuttle extends RelShuttleImpl { +public final class LogicalScanRelRewriter extends RelShuttleImpl { private final String databaseType; @@ -38,13 +38,13 @@ public final class LogicalScanRelShuttle extends RelShuttleImpl { } /** - * Replace table scan with logical scan. + * Rewrite table scan to logical scan. * * @param relNode rel node * @param databaseType database type - * @return replaced rel node + * @return rewritten rel node */ - public static RelNode replace(final RelNode relNode, final String databaseType) { - return relNode.accept(new LogicalScanRelShuttle(databaseType)); + public static RelNode rewrite(final RelNode relNode, final String databaseType) { + return relNode.accept(new LogicalScanRelRewriter(databaseType)); } } diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java index 1d331840a71..ecff13c9a01 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java @@ -24,7 +24,7 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; -import org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util.LogicalScanPushDownRelBuilder; +import org.apache.shardingsphere.sqlfederation.compiler.rel.builder.LogicalScanPushDownRelBuilder; import java.util.Collections; import java.util.Objects; diff --git a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java index 91753425e3a..09713b01c88 100644 --- a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java +++ b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java @@ -84,7 +84,7 @@ public final class EnumerableScan extends TableScan implements EnumerableRel { public Result implement(final EnumerableRelImplementor implementor, final Prefer pref) { PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getPushDownRowType(), pref.preferArray()); int[] paramIndexes = null == sqlString.getDynamicParameters() ? new int[]{} : getParamIndexes(sqlString.getDynamicParameters()); - return implementor.result(physType, Blocks.toBlock(Expressions.call(Objects.requireNonNull(table.getExpression(SQLFederationTable.class)), "execute", implementor.getRootExpression(), + return implementor.result(physType, Blocks.toBlock(Expressions.call(Objects.requireNonNull(table.getExpression(SQLFederationTable.class)), "implement", implementor.getRootExpression(), Expressions.constant(sqlString.getSql().replace("u&'\\", "'\\u")), Expressions.constant(paramIndexes)))); } diff --git a/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java b/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java index 1fe8747357f..d2073118c67 100644 --- a/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java +++ b/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java @@ -19,14 +19,8 @@ package org.apache.shardingsphere.sqlfederation.compiler.compiler.it; import lombok.SneakyThrows; import org.apache.calcite.adapter.enumerable.EnumerableConvention; -import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfigImpl; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.sql.validate.SqlValidator; -import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.jdbc.CalciteSchema; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema; @@ -36,10 +30,10 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule; import org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; import org.apache.shardingsphere.sqlfederation.compiler.compiler.SQLStatementCompiler; +import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; import org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeFactory; import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationSchema; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.util.SQLFederationValidatorUtils; -import org.apache.shardingsphere.sqlfederation.compiler.planner.builder.SQLFederationPlannerBuilder; +import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; @@ -83,8 +77,12 @@ class SQLStatementCompilerIT { tables.add(createTProductDetailMetaData()); tables.add(createMultiTypesFirstTableMetaData()); tables.add(createMultiTypesSecondTableMetaData()); - sqlStatementCompiler = - new SQLStatementCompiler(createSqlToRelConverter(new ShardingSphereSchema("foo_db", tables, Collections.emptyList())), EnumerableConvention.INSTANCE); + CalciteSchema calciteSchema = CalciteSchema.createRootSchema(true); + DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "H2"); + calciteSchema.add(SCHEMA_NAME, + new SQLFederationSchema(SCHEMA_NAME, new ShardingSphereSchema("foo_db", tables, Collections.emptyList()), databaseType, SQLFederationDataTypeFactory.getInstance())); + sqlStatementCompiler = new SQLStatementCompiler(new SQLFederationRelConverter(new CompilerContext(mock(SQLParserRule.class), calciteSchema, new CalciteConnectionConfigImpl(new Properties())), + Collections.singletonList("federate_jdbc"), databaseType, EnumerableConvention.INSTANCE), EnumerableConvention.INSTANCE); } private ShardingSphereTable createOrderFederationTableMetaData() { @@ -233,18 +231,6 @@ class SQLStatementCompilerIT { Collections.emptyList(), Collections.emptyList()); } - private SqlToRelConverter createSqlToRelConverter(final ShardingSphereSchema schema) { - CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(new Properties()); - RelDataTypeFactory relDataTypeFactory = SQLFederationDataTypeFactory.getInstance(); - DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "H2"); - SQLFederationSchema sqlFederationSchema = new SQLFederationSchema(SCHEMA_NAME, schema, databaseType, SQLFederationDataTypeFactory.getInstance()); - CalciteCatalogReader catalogReader = - SQLFederationValidatorUtils.createCatalogReader(SCHEMA_NAME, sqlFederationSchema, relDataTypeFactory, connectionConfig, databaseType); - SqlValidator validator = SQLFederationValidatorUtils.createSqlValidator(catalogReader, relDataTypeFactory, databaseType, connectionConfig); - RelOptCluster cluster = RelOptCluster.create(SQLFederationPlannerBuilder.buildVolcanoPlanner(EnumerableConvention.INSTANCE), new RexBuilder(relDataTypeFactory)); - return SQLFederationValidatorUtils.createSqlToRelConverter(catalogReader, validator, cluster, mock(SQLParserRule.class), databaseType, false); - } - @ParameterizedTest(name = "{0}") @ArgumentsSource(TestCaseArgumentsProvider.class) void assertCompile(final TestCase testcase) { diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/context/SQLFederationContext.java similarity index 95% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java rename to kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/context/SQLFederationContext.java index bce17720cdc..958bd470093 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/context/SQLFederationContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.context; +package org.apache.shardingsphere.sqlfederation.context; import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java index 007c2da6135..ab7c0d68554 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java @@ -19,20 +19,13 @@ package org.apache.shardingsphere.sqlfederation.engine; import com.google.common.base.Joiner; import lombok.Getter; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.prepare.CalciteCatalogReader; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.validate.SqlValidator; -import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.dal.ExplainStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; @@ -54,18 +47,16 @@ import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader; import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment; import org.apache.shardingsphere.sql.parser.statement.core.statement.dml.SelectStatement; -import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor; -import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessorFactory; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan; import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationCompilerEngine; +import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan; import org.apache.shardingsphere.sqlfederation.compiler.compiler.SQLStatementCompiler; import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; import org.apache.shardingsphere.sqlfederation.compiler.exception.SQLFederationUnsupportedSQLException; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.catalog.SQLFederationCatalogReader; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeFactory; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.util.SQLFederationValidatorUtils; import org.apache.shardingsphere.sqlfederation.compiler.planner.cache.ExecutionPlanCacheKey; +import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter; +import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor; +import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessorFactory; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule; import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider; @@ -113,7 +104,7 @@ public final class SQLFederationEngine implements AutoCloseable { this.currentDatabaseName = currentDatabaseName; this.currentSchemaName = currentSchemaName; sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class); - processor = SQLFederationProcessorFactory.getInstance().newInstance(metaData, statistics, jdbcExecutor); + processor = SQLFederationProcessorFactory.getInstance().newInstance(statistics, jdbcExecutor); } /** @@ -193,11 +184,13 @@ public final class SQLFederationEngine implements AutoCloseable { queryContext = federationContext.getQueryContext(); try { SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext(); - SqlToRelConverter converter = creeateSQLToRelConverter(sqlStatementContext, processor.getConvention()); - schemaPlus = converter.validator.getCatalogReader().getRootSchema().plus(); - processor.prepare(prepareEngine, callback, currentDatabaseName, currentSchemaName, federationContext, sqlFederationRule.getCompilerContext(), schemaPlus); - SQLFederationExecutionPlan executionPlan = - compileQuery(converter, currentDatabaseName, currentSchemaName, federationContext.getMetaData(), sqlStatementContext, queryContext.getSql(), processor.getConvention()); + CompilerContext compilerContext = sqlFederationRule.getCompilerContext(); + SQLFederationRelConverter converter = new SQLFederationRelConverter(compilerContext, + getSchemaPath(sqlStatementContext), sqlStatementContext.getDatabaseType(), processor.getConvention()); + schemaPlus = converter.getSchemaPlus(); + processor.prepare(prepareEngine, callback, currentDatabaseName, currentSchemaName, federationContext, compilerContext, schemaPlus); + SQLFederationExecutionPlan executionPlan = compileQuery(converter, currentDatabaseName, + currentSchemaName, federationContext.getMetaData(), sqlStatementContext, queryContext.getSql(), processor.getConvention()); resultSet = processor.executePlan(prepareEngine, callback, executionPlan, converter, federationContext, schemaPlus); return resultSet; // CHECKSTYLE:OFF @@ -209,20 +202,8 @@ public final class SQLFederationEngine implements AutoCloseable { } } - private SqlToRelConverter creeateSQLToRelConverter(final SQLStatementContext sqlStatementContext, final Convention convention) { - CompilerContext compilerContext = sqlFederationRule.getCompilerContext(); - JavaTypeFactory typeFactory = SQLFederationDataTypeFactory.getInstance(); - CalciteConnectionConfig connectionConfig = compilerContext.getConnectionConfig(); - DatabaseType databaseType = sqlStatementContext.getDatabaseType(); - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData(); - List<String> schemaPath = getSchemaPath(dialectDatabaseMetaData, sqlStatementContext); - CalciteCatalogReader catalogReader = new SQLFederationCatalogReader(compilerContext.getCalciteSchema(), schemaPath, typeFactory, connectionConfig); - SqlValidator validator = SQLFederationValidatorUtils.createSqlValidator(catalogReader, typeFactory, databaseType, connectionConfig); - RelOptCluster relOptCluster = SQLFederationValidatorUtils.createRelOptCluster(typeFactory, convention); - return SQLFederationValidatorUtils.createSqlToRelConverter(catalogReader, validator, relOptCluster, compilerContext.getSqlParserRule(), databaseType, true); - } - - private List<String> getSchemaPath(final DialectDatabaseMetaData dialectDatabaseMetaData, final SQLStatementContext sqlStatementContext) { + private List<String> getSchemaPath(final SQLStatementContext sqlStatementContext) { + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDialectDatabaseMetaData(); // TODO set default schema according to search path result if (dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().isPresent()) { return sqlStatementContext instanceof TableAvailable && ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables().stream().anyMatch(each -> each.getOwner().isPresent()) @@ -232,7 +213,7 @@ public final class SQLFederationEngine implements AutoCloseable { return Collections.singletonList(currentDatabaseName); } - private SQLFederationExecutionPlan compileQuery(final SqlToRelConverter converter, final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, + private SQLFederationExecutionPlan compileQuery(final SQLFederationRelConverter converter, final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext, final String sql, final Convention convention) { SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter, convention); SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache()); diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java index 4895625ee23..8c7ad6d7059 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java @@ -19,15 +19,15 @@ package org.apache.shardingsphere.sqlfederation.engine.processor; import org.apache.calcite.plan.Convention; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; import org.apache.shardingsphere.infra.session.query.QueryContext; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan; import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; +import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import java.sql.Connection; import java.sql.ResultSet; @@ -73,7 +73,7 @@ public interface SQLFederationProcessor { * @return resultset */ ResultSet executePlan(DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, JDBCExecutorCallback<? extends ExecuteResult> callback, - SQLFederationExecutionPlan executionPlan, SqlToRelConverter converter, SQLFederationContext federationContext, SchemaPlus schemaPlus); + SQLFederationExecutionPlan executionPlan, SQLFederationRelConverter converter, SQLFederationContext federationContext, SchemaPlus schemaPlus); /** * Get conversion. diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java index 634ea5907bd..91bd0bcc8bf 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java @@ -20,7 +20,6 @@ package org.apache.shardingsphere.sqlfederation.engine.processor; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; -import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; import org.apache.shardingsphere.sqlfederation.engine.processor.impl.StandardSQLFederationProcessor; @@ -44,12 +43,11 @@ public final class SQLFederationProcessorFactory { /** * Create new instance of {@link SQLFederationProcessor}. * - * @param metaData shardingSphere meta data * @param statistics shardingSphere statistics * @param jdbcExecutor JDBC executor * @return created instance */ - public SQLFederationProcessor newInstance(final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, final JDBCExecutor jdbcExecutor) { - return new StandardSQLFederationProcessor(metaData, statistics, jdbcExecutor); + public SQLFederationProcessor newInstance(final ShardingSphereStatistics statistics, final JDBCExecutor jdbcExecutor) { + return new StandardSQLFederationProcessor(statistics, jdbcExecutor); } } diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java index f12c70574f5..88ed595e514 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java @@ -27,7 +27,7 @@ import org.apache.calcite.plan.Convention; import org.apache.calcite.runtime.Bindable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; -import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.shardingsphere.infra.binder.context.segment.select.projection.Projection; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; @@ -40,19 +40,19 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; -import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment; -import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext; -import org.apache.shardingsphere.sqlfederation.executor.enumerable.EnumerableScanExecutor; import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan; import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; import org.apache.shardingsphere.sqlfederation.compiler.exception.SQLFederationSchemaNotFoundException; import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationTable; +import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; +import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor; +import org.apache.shardingsphere.sqlfederation.executor.context.ExecutorBindContext; +import org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext; +import org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor.EnumerableScanImplementor; import org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet; import java.sql.Connection; @@ -69,12 +69,12 @@ import java.util.Map; @RequiredArgsConstructor public final class StandardSQLFederationProcessor implements SQLFederationProcessor { - private final ShardingSphereMetaData metaData; - private final ShardingSphereStatistics statistics; private final JDBCExecutor jdbcExecutor; + private ExecutorContext executorContext; + @Override public void prepare(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback, final String currentDatabaseName, final String currentSchemaName, final SQLFederationContext federationContext, final CompilerContext compilerContext, @@ -82,9 +82,9 @@ public final class StandardSQLFederationProcessor implements SQLFederationProces if (null == schemaPlus) { return; } - SQLFederationExecutorContext executorContext = new SQLFederationExecutorContext(currentDatabaseName, currentSchemaName, metaData.getProps()); - EnumerableScanExecutor scanExecutor = - new EnumerableScanExecutor(prepareEngine, jdbcExecutor, callback, compilerContext, executorContext, federationContext, metaData.getGlobalRuleMetaData(), statistics); + executorContext = + new ExecutorContext(prepareEngine, jdbcExecutor, callback, statistics, currentDatabaseName, currentSchemaName, federationContext.isPreview(), federationContext.getProcessId()); + EnumerableScanImplementor scanExecutor = new EnumerableScanImplementor(federationContext.getQueryContext(), compilerContext, executorContext); SQLStatementContext sqlStatementContext = federationContext.getQueryContext().getSqlStatementContext(); Collection<SimpleTableSegment> simpleTables = sqlStatementContext instanceof TableAvailable ? ((TableAvailable) sqlStatementContext).getTablesContext().getSimpleTables() @@ -134,14 +134,18 @@ public final class StandardSQLFederationProcessor implements SQLFederationProces @SuppressWarnings("unchecked") @Override public ResultSet executePlan(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback, - final SQLFederationExecutionPlan executionPlan, final SqlToRelConverter converter, final SQLFederationContext federationContext, + final SQLFederationExecutionPlan executionPlan, final SQLFederationRelConverter converter, final SQLFederationContext federationContext, final SchemaPlus schemaPlus) { Bindable<Object> executablePlan = EnumerableInterpretable.toBindable(Collections.emptyMap(), null, (EnumerableRel) executionPlan.getPhysicalPlan(), Prefer.ARRAY); Map<String, Object> params = createParameters(federationContext.getQueryContext().getParameters()); - Enumerator<Object> enumerator = executablePlan.bind(new SQLFederationBindContext(converter, params)).enumerator(); + Enumerator<Object> enumerator = executablePlan.bind(new ExecutorBindContext(converter, params)).enumerator(); SelectStatementContext selectStatementContext = (SelectStatementContext) federationContext.getQueryContext().getSqlStatementContext(); - return new SQLFederationResultSet(enumerator, schemaPlus, selectStatementContext.getProjectionsContext().getExpandProjections(), selectStatementContext.getDatabaseType(), - executionPlan.getResultColumnType()); + List<Projection> expandProjections = selectStatementContext.getProjectionsContext().getExpandProjections(); + SQLFederationResultSet result = new SQLFederationResultSet(enumerator, schemaPlus, expandProjections, selectStatementContext.getDatabaseType(), executionPlan.getResultColumnType()); + if (federationContext.isPreview()) { + federationContext.getPreviewExecutionUnits().addAll(executorContext.getPreviewExecutionUnits()); + } + return result; } private Map<String, Object> createParameters(final List<Object> params) { diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorBindContext.java similarity index 85% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java rename to kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorBindContext.java index d896ce68fc0..45328373232 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorBindContext.java @@ -23,24 +23,24 @@ import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter; import java.util.Map; /** - * SQL federation bind context. + * Executor bind context. */ @RequiredArgsConstructor -public final class SQLFederationBindContext implements DataContext { +public final class ExecutorBindContext implements DataContext { - private final SqlToRelConverter converter; + private final SQLFederationRelConverter converter; @Getter private final Map<String, Object> parameters; @Override public SchemaPlus getRootSchema() { - return converter.validator.getCatalogReader().getRootSchema().plus(); + return converter.getSchemaPlus(); } @Override diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorContext.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorContext.java new file mode 100644 index 00000000000..14fc4c57d1e --- /dev/null +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.sqlfederation.executor.context; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; +import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; +import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; +import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine; +import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; + +import java.sql.Connection; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Executor context. + */ +@RequiredArgsConstructor +@Getter +public final class ExecutorContext { + + private final ProcessEngine processEngine = new ProcessEngine(); + + private final Map<String, Integer> connectionOffsets = new LinkedHashMap<>(); + + private final Collection<ExecutionUnit> previewExecutionUnits = new LinkedList<>(); + + private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine; + + private final JDBCExecutor jdbcExecutor; + + private final JDBCExecutorCallback<? extends ExecuteResult> callback; + + private final ShardingSphereStatistics statistics; + + private final String currentDatabaseName; + + private final String currentSchemaName; + + private final boolean preview; + + private final String processId; +} diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java deleted file mode 100644 index b06b5773dd0..00000000000 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.sqlfederation.executor.context; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.config.props.ConfigurationProperties; - -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * SQL federation executor context. - */ -@RequiredArgsConstructor -@Getter -public final class SQLFederationExecutorContext { - - private final String currentDatabaseName; - - private final String currentSchemaName; - - private final ConfigurationProperties props; - - private final Map<String, Integer> connectionOffsets = new LinkedHashMap<>(); -} diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/jdbc/JDBCDataRowEnumerator.java similarity index 96% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java rename to kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/jdbc/JDBCDataRowEnumerator.java index bb26814c93b..bbb8f361294 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/jdbc/JDBCDataRowEnumerator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.enumerator; +package org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.jdbc; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -32,10 +32,10 @@ import java.time.LocalDateTime; import java.util.Collection; /** - * JDBC row enumerator. + * JDBC data row enumerator. */ @RequiredArgsConstructor -public final class JDBCRowEnumerator implements Enumerator<Object> { +public final class JDBCDataRowEnumerator implements Enumerator<Object> { private final MergedResult queryResult; diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataRowEnumerator.java similarity index 78% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java rename to kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataRowEnumerator.java index 2f283f71b47..4bcf0b4c5f2 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataRowEnumerator.java @@ -15,13 +15,12 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.enumerator; +package org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory; import org.apache.calcite.linq4j.Enumerator; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn; import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics; -import org.apache.shardingsphere.sqlfederation.executor.utils.EnumeratorUtils; import java.util.ArrayList; import java.util.Collection; @@ -29,9 +28,9 @@ import java.util.Iterator; import java.util.Map; /** - * Memory row enumerator. + * Memory data row enumerator. */ -public final class MemoryRowEnumerator implements Enumerator<Object> { +public final class MemoryDataRowEnumerator implements Enumerator<Object> { private final Collection<RowStatistics> rows; @@ -41,9 +40,9 @@ public final class MemoryRowEnumerator implements Enumerator<Object> { private Object current; - public MemoryRowEnumerator(final Collection<RowStatistics> rows, final Collection<ShardingSphereColumn> columns, final DatabaseType databaseType) { + public MemoryDataRowEnumerator(final Collection<RowStatistics> rows, final Collection<ShardingSphereColumn> columns, final DatabaseType databaseType) { this.rows = rows; - columnTypes = EnumeratorUtils.createColumnTypes(new ArrayList<>(columns), databaseType); + columnTypes = MemoryDataTypeConverter.createColumnTypes(new ArrayList<>(columns), databaseType); iterator = rows.iterator(); } @@ -55,7 +54,7 @@ public final class MemoryRowEnumerator implements Enumerator<Object> { @Override public boolean moveNext() { if (iterator.hasNext()) { - current = EnumeratorUtils.convertToTargetType(columnTypes, iterator.next().getRows().toArray()); + current = MemoryDataTypeConverter.convertToTargetType(columnTypes, iterator.next().getRows().toArray()); return true; } current = null; diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataTypeConverter.java similarity index 96% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java rename to kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataTypeConverter.java index c67c5485cb5..fc0ba4e164d 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataTypeConverter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.utils; +package org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -35,7 +35,7 @@ import java.util.Optional; * Enumerator utilities. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class EnumeratorUtils { +public final class MemoryDataTypeConverter { /** * Create column types. diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryTableStatisticsBuilder.java similarity index 55% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java rename to kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryTableStatisticsBuilder.java index e638f61748e..28e374bcbf8 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryTableStatisticsBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.utils; +package org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -31,61 +31,75 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee; import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; /** - * Statistics assemble utils. + * Memory table statistics builder. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class StatisticsAssembleUtils { +public final class MemoryTableStatisticsBuilder { /** - * Assemble table statistics. + * Build table statistics. * * @param table table * @param metaData meta data * @param driverQuerySystemCatalogOption driver query system catalog option * @return table statistics */ - public static TableStatistics assembleTableStatistics(final ShardingSphereTable table, final ShardingSphereMetaData metaData, - final DialectDriverQuerySystemCatalogOption driverQuerySystemCatalogOption) { - TableStatistics result = new TableStatistics(table.getName()); + public static TableStatistics buildTableStatistics(final ShardingSphereTable table, final ShardingSphereMetaData metaData, + final DialectDriverQuerySystemCatalogOption driverQuerySystemCatalogOption) { if (driverQuerySystemCatalogOption.isDatabaseDataTable(table.getName())) { - assembleDatabaseData(result, metaData.getAllDatabases(), driverQuerySystemCatalogOption.getDatCompatibility()); - } else if (driverQuerySystemCatalogOption.isTableDataTable(table.getName())) { - for (ShardingSphereDatabase each : metaData.getAllDatabases()) { - assembleTableData(result, each.getAllSchemas()); - } - } else if (driverQuerySystemCatalogOption.isRoleDataTable(table.getName())) { - assembleRoleData(result, metaData); + return buildDatabaseData(table.getName(), metaData.getAllDatabases(), driverQuerySystemCatalogOption.getDatCompatibility()); } - return result; + if (driverQuerySystemCatalogOption.isTableDataTable(table.getName())) { + return buildTableData(table.getName(), metaData); + } + if (driverQuerySystemCatalogOption.isRoleDataTable(table.getName())) { + return buildRoleData(table.getName(), metaData); + } + return new TableStatistics(table.getName()); } - private static void assembleDatabaseData(final TableStatistics tableStatistics, final Collection<ShardingSphereDatabase> databases, final String datCompatibility) { + private static TableStatistics buildDatabaseData(final String tableName, final Collection<ShardingSphereDatabase> databases, final String datCompatibility) { + TableStatistics result = new TableStatistics(tableName); for (ShardingSphereDatabase each : databases) { Object[] rows = new Object[15]; rows[0] = each.getName(); rows[11] = datCompatibility; - tableStatistics.getRows().add(new RowStatistics(Arrays.asList(rows))); + result.getRows().add(new RowStatistics(Arrays.asList(rows))); } + return result; } - private static void assembleTableData(final TableStatistics tableStatistics, final Collection<ShardingSphereSchema> schemas) { + private static TableStatistics buildTableData(final String tableName, final ShardingSphereMetaData metaData) { + TableStatistics result = new TableStatistics(tableName); + for (ShardingSphereDatabase each : metaData.getAllDatabases()) { + result.getRows().addAll(buildTableData(each.getAllSchemas())); + } + return result; + } + + private static Collection<RowStatistics> buildTableData(final Collection<ShardingSphereSchema> schemas) { + Collection<RowStatistics> result = new LinkedList<>(); for (ShardingSphereSchema schema : schemas) { for (ShardingSphereTable each : schema.getAllTables()) { Object[] rows = new Object[10]; rows[0] = schema.getName(); rows[1] = each.getName(); - tableStatistics.getRows().add(new RowStatistics(Arrays.asList(rows))); + result.add(new RowStatistics(Arrays.asList(rows))); } } + return result; } - private static void assembleRoleData(final TableStatistics tableStatistics, final ShardingSphereMetaData metaData) { + private static TableStatistics buildRoleData(final String tableName, final ShardingSphereMetaData metaData) { + TableStatistics result = new TableStatistics(tableName); for (Grantee each : metaData.getGlobalRuleMetaData().getSingleRule(AuthorityRule.class).getGrantees()) { Object[] rows = new Object[27]; rows[0] = each.getUsername(); - tableStatistics.getRows().add(new RowStatistics(Arrays.asList(rows))); + result.getRows().add(new RowStatistics(Arrays.asList(rows))); } + return result; } } diff --git a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java similarity index 70% rename from kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java rename to kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java index 6e441ce3aed..7d569fd774c 100644 --- a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java +++ b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.enumerable; +package org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -38,11 +38,7 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupRepor import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext; import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; -import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult; -import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.hint.HintValueContext; @@ -50,23 +46,19 @@ import org.apache.shardingsphere.infra.merge.MergeEngine; import org.apache.shardingsphere.infra.merge.result.MergedResult; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; -import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; -import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext; -import org.apache.shardingsphere.sqlfederation.executor.enumerator.JDBCRowEnumerator; -import org.apache.shardingsphere.sqlfederation.executor.enumerator.MemoryRowEnumerator; -import org.apache.shardingsphere.sqlfederation.executor.utils.StatisticsAssembleUtils; import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.EmptyRowEnumerator; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutor; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutorContext; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementor; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.enumerator.EmptyDataRowEnumerator; +import org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext; +import org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.jdbc.JDBCDataRowEnumerator; +import org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory.MemoryDataRowEnumerator; +import org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory.MemoryTableStatisticsBuilder; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; @@ -79,42 +71,32 @@ import java.util.Optional; import java.util.stream.Collectors; /** - * Enumerable scan executor. + * Enumerable scan implementor. */ @RequiredArgsConstructor -public final class EnumerableScanExecutor implements ScanExecutor { +public final class EnumerableScanImplementor implements ScanImplementor { - private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine; - - private final JDBCExecutor jdbcExecutor; - - private final JDBCExecutorCallback<? extends ExecuteResult> callback; + private final QueryContext queryContext; private final CompilerContext compilerContext; - private final SQLFederationExecutorContext executorContext; - - private final SQLFederationContext federationContext; - - private final RuleMetaData globalRuleMetaData; - - private final ShardingSphereStatistics statistics; + private final ExecutorContext executorContext; private final ProcessEngine processEngine = new ProcessEngine(); @Override - public Enumerable<Object> execute(final ShardingSphereTable table, final ScanExecutorContext scanContext) { - SQLStatementContext sqlStatementContext = federationContext.getQueryContext().getSqlStatementContext(); + public Enumerable<Object> implement(final ShardingSphereTable table, final ScanImplementorContext scanContext) { + SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext(); if (containsSystemSchema(sqlStatementContext)) { return createMemoryEnumerable(sqlStatementContext, table); } - QueryContext queryContext = createQueryContext(federationContext.getMetaData(), scanContext, sqlStatementContext.getDatabaseType(), federationContext.getQueryContext().isUseCache()); - ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, globalRuleMetaData, executorContext.getProps()); - if (federationContext.isPreview()) { - federationContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits()); + QueryContext scanQueryContext = createQueryContext(queryContext.getMetaData(), scanContext, sqlStatementContext.getDatabaseType(), queryContext.isUseCache()); + ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(scanQueryContext, queryContext.getMetaData().getGlobalRuleMetaData(), queryContext.getMetaData().getProps()); + if (executorContext.isPreview()) { + executorContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits()); return createEmptyEnumerable(); } - return createJDBCEnumerable(queryContext, federationContext.getMetaData().getDatabase(executorContext.getCurrentDatabaseName()), executionContext); + return createJDBCEnumerable(scanQueryContext, queryContext.getMetaData().getDatabase(executorContext.getCurrentDatabaseName()), executionContext); } private boolean containsSystemSchema(final SQLStatementContext sqlStatementContext) { @@ -128,29 +110,33 @@ public final class EnumerableScanExecutor implements ScanExecutor { return false; } - private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) { + private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext executionContext) { return new AbstractEnumerable<Object>() { @SneakyThrows @Override public Enumerator<Object> enumerator() { - computeConnectionOffsets(context); - // TODO pass grantee from proxy and jdbc adapter - ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare( - database.getName(), context.getRouteContext(), executorContext.getConnectionOffsets(), - context.getExecutionUnits(), new ExecutionGroupReportContext(federationContext.getProcessId(), database.getName())); + computeConnectionOffsets(executionContext); + ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepare(database, executionContext); setParameters(executionGroupContext.getInputGroups()); - ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(federationContext.getProcessId()).isInterrupted(), SQLExecutionInterruptedException::new); - processEngine.executeSQL(executionGroupContext, federationContext.getQueryContext()); - List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList()); - MergeEngine mergeEngine = new MergeEngine(federationContext.getMetaData(), database, executorContext.getProps(), federationContext.getQueryContext().getConnectionContext()); + ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(executorContext.getProcessId()).isInterrupted(), SQLExecutionInterruptedException::new); + processEngine.executeSQL(executionGroupContext, queryContext); + List<QueryResult> queryResults = + executorContext.getJdbcExecutor().execute(executionGroupContext, executorContext.getCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList()); + MergeEngine mergeEngine = new MergeEngine(queryContext.getMetaData(), database, queryContext.getMetaData().getProps(), queryContext.getConnectionContext()); MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext()); Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups()); - return new JDBCRowEnumerator(mergedResult, queryResults.get(0).getMetaData(), statements); + return new JDBCDataRowEnumerator(mergedResult, queryResults.get(0).getMetaData(), statements); } }; } + private ExecutionGroupContext<JDBCExecutionUnit> prepare(final ShardingSphereDatabase database, final ExecutionContext executionContext) throws SQLException { + // TODO pass grantee from proxy and jdbc adapter + return executorContext.getPrepareEngine().prepare(database.getName(), executionContext.getRouteContext(), executorContext.getConnectionOffsets(), executionContext.getExecutionUnits(), + new ExecutionGroupReportContext(executorContext.getProcessId(), database.getName())); + } + private void computeConnectionOffsets(final ExecutionContext context) { for (ExecutionUnit each : context.getExecutionUnits()) { if (executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) { @@ -166,13 +152,13 @@ public final class EnumerableScanExecutor implements ScanExecutor { DatabaseType databaseType = sqlStatementContext.getDatabaseType(); Optional<DialectDriverQuerySystemCatalogOption> driverQuerySystemCatalogOption = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getDriverQuerySystemCatalogOption(); if (driverQuerySystemCatalogOption.isPresent() && driverQuerySystemCatalogOption.get().isSystemTable(table.getName())) { - return createMemoryEnumerator(StatisticsAssembleUtils.assembleTableStatistics(table, federationContext.getMetaData(), driverQuerySystemCatalogOption.get()), table, databaseType); + return createMemoryEnumerator(MemoryTableStatisticsBuilder.buildTableStatistics(table, queryContext.getMetaData(), driverQuerySystemCatalogOption.get()), table, databaseType); } ShardingSpherePreconditions.checkState(sqlStatementContext instanceof TableAvailable, () -> new IllegalStateException(String.format("Can not support %s in sql federation", sqlStatementContext.getSqlStatement().getClass().getSimpleName()))); String databaseName = ((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().orElse(executorContext.getCurrentDatabaseName()); String schemaName = ((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName().orElse(executorContext.getCurrentSchemaName()); - Optional<TableStatistics> tableStatistics = Optional.ofNullable(statistics.getDatabaseStatistics(databaseName)) + Optional<TableStatistics> tableStatistics = Optional.ofNullable(executorContext.getStatistics().getDatabaseStatistics(databaseName)) .map(optional -> optional.getSchemaStatistics(schemaName)).map(optional -> optional.getTableStatistics(table.getName())); return tableStatistics.map(optional -> createMemoryEnumerator(optional, table, databaseType)).orElseGet(this::createEmptyEnumerable); } @@ -182,7 +168,7 @@ public final class EnumerableScanExecutor implements ScanExecutor { @Override public Enumerator<Object> enumerator() { - return new MemoryRowEnumerator(tableStatistics.getRows(), table.getAllColumns(), databaseType); + return new MemoryDataRowEnumerator(tableStatistics.getRows(), table.getAllColumns(), databaseType); } }; } @@ -215,13 +201,13 @@ public final class EnumerableScanExecutor implements ScanExecutor { } } - private QueryContext createQueryContext(final ShardingSphereMetaData metaData, final ScanExecutorContext sqlString, final DatabaseType databaseType, final boolean useCache) { + private QueryContext createQueryContext(final ShardingSphereMetaData metaData, final ScanImplementorContext sqlString, final DatabaseType databaseType, final boolean useCache) { String sql = sqlString.getSql().replace(System.lineSeparator(), " "); SQLStatement sqlStatement = compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse(sql, useCache); List<Object> params = getParameters(sqlString.getParamIndexes()); HintValueContext hintValueContext = new HintValueContext(); SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData, executorContext.getCurrentDatabaseName(), hintValueContext).bind(sqlStatement, params); - return new QueryContext(sqlStatementContext, sql, params, hintValueContext, federationContext.getQueryContext().getConnectionContext(), metaData, useCache); + return new QueryContext(sqlStatementContext, sql, params, hintValueContext, queryContext.getConnectionContext(), metaData, useCache); } private List<Object> getParameters(final int[] paramIndexes) { @@ -230,7 +216,7 @@ public final class EnumerableScanExecutor implements ScanExecutor { } List<Object> result = new ArrayList<>(paramIndexes.length); for (int each : paramIndexes) { - result.add(federationContext.getQueryContext().getParameters().get(each)); + result.add(queryContext.getParameters().get(each)); } return result; } @@ -240,7 +226,7 @@ public final class EnumerableScanExecutor implements ScanExecutor { @Override public Enumerator<Object> enumerator() { - return new EmptyRowEnumerator(); + return new EmptyDataRowEnumerator(); } }; } diff --git a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java similarity index 77% rename from kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java rename to kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java index 941bd0f7b1f..a7dd882a505 100644 --- a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java +++ b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.executor.enumerable; +package org.apache.shardingsphere.sqlfederation.executor.executor; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; @@ -28,11 +28,12 @@ import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics; import org.apache.shardingsphere.infra.metadata.statistics.SchemaStatistics; import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics; +import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext; import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext; -import org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutorContext; +import org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext; +import org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext; +import org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor.EnumerableScanImplementor; import org.junit.jupiter.api.Test; import java.sql.Types; @@ -46,30 +47,23 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class EnumerableScanExecutorTest { +class EnumerableScanImplementorTest { @Test - void assertExecuteWithStatistics() { + void assertImplementWithStatistics() { CompilerContext compilerContext = mock(CompilerContext.class, RETURNS_DEEP_STUBS); - SQLFederationExecutorContext executorContext = mock(SQLFederationExecutorContext.class); + ExecutorContext executorContext = mock(ExecutorContext.class); when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db"); when(executorContext.getCurrentSchemaName()).thenReturn("pg_catalog"); - ShardingSphereStatistics statistics = mock(ShardingSphereStatistics.class, RETURNS_DEEP_STUBS); - DatabaseStatistics databaseStatistics = mock(DatabaseStatistics.class, RETURNS_DEEP_STUBS); - when(statistics.getDatabaseStatistics("foo_db")).thenReturn(databaseStatistics); - SchemaStatistics schemaStatistics = mock(SchemaStatistics.class, RETURNS_DEEP_STUBS); - when(databaseStatistics.getSchemaStatistics("pg_catalog")).thenReturn(schemaStatistics); - TableStatistics tableStatistics = mock(TableStatistics.class); - when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new RowStatistics(Collections.singletonList(1)))); - when(schemaStatistics.getTableStatistics("test")).thenReturn(tableStatistics); + ShardingSphereStatistics statistics = mockStatistics(); + when(executorContext.getStatistics()).thenReturn(statistics); ShardingSphereTable table = mock(ShardingSphereTable.class, RETURNS_DEEP_STUBS); when(table.getName()).thenReturn("test"); when(table.getAllColumns()).thenReturn(Collections.singleton(new ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true, false))); - SQLFederationContext federationContext = mock(SQLFederationContext.class, RETURNS_DEEP_STUBS); + QueryContext queryContext = mock(QueryContext.class, RETURNS_DEEP_STUBS); SelectStatementContext selectStatementContext = mockSelectStatementContext(); - when(federationContext.getQueryContext().getSqlStatementContext()).thenReturn(selectStatementContext); - Enumerable<Object> enumerable = - new EnumerableScanExecutor(null, null, null, compilerContext, executorContext, federationContext, null, statistics).execute(table, mock(ScanExecutorContext.class)); + when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext); + Enumerable<Object> enumerable = new EnumerableScanImplementor(queryContext, compilerContext, executorContext).implement(table, mock(ScanImplementorContext.class)); try (Enumerator<Object> actual = enumerable.enumerator()) { actual.moveNext(); Object row = actual.current(); @@ -78,6 +72,18 @@ class EnumerableScanExecutorTest { } } + private ShardingSphereStatistics mockStatistics() { + ShardingSphereStatistics result = mock(ShardingSphereStatistics.class, RETURNS_DEEP_STUBS); + DatabaseStatistics databaseStatistics = mock(DatabaseStatistics.class, RETURNS_DEEP_STUBS); + when(result.getDatabaseStatistics("foo_db")).thenReturn(databaseStatistics); + SchemaStatistics schemaStatistics = mock(SchemaStatistics.class, RETURNS_DEEP_STUBS); + when(databaseStatistics.getSchemaStatistics("pg_catalog")).thenReturn(schemaStatistics); + TableStatistics tableStatistics = mock(TableStatistics.class); + when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new RowStatistics(Collections.singletonList(1)))); + when(schemaStatistics.getTableStatistics("test")).thenReturn(tableStatistics); + return result; + } + private SelectStatementContext mockSelectStatementContext() { SelectStatementContext result = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS); when(result.getDatabaseType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL")); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java index 2304b7dc529..9899928c731 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java @@ -77,7 +77,7 @@ import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatemen import org.apache.shardingsphere.sql.parser.statement.core.statement.dml.DMLStatement; import org.apache.shardingsphere.sql.parser.statement.core.statement.dml.InsertStatement; import org.apache.shardingsphere.sql.parser.statement.core.statement.dml.SelectStatement; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback; diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java index 736588e0a0b..2ce102b6ef5 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java @@ -58,7 +58,7 @@ import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnection import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import java.sql.Connection; import java.sql.SQLException; diff --git a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java index ac11c30a7e6..90f194d7a49 100644 --- a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java +++ b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java @@ -48,7 +48,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; import org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; +import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext; import java.sql.Connection; import java.sql.ResultSet;