This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit d4aadae4479bdced314236d02d55df5638d7b039 Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Mon Sep 30 20:14:39 2019 +0300 IGNITE-12248: Apache Calcite based query execution engine. --- modules/calcite/README.txt | 35 ++ modules/calcite/pom.xml | 105 ++++++ .../query/calcite/CalciteQueryProcessor.java | 162 +++++++++ .../query/calcite/prepare/DataContextImpl.java | 75 ++++ .../calcite/prepare/DistributedExecution.java | 114 +++++++ .../query/calcite/prepare/IgnitePlanner.java | 376 +++++++++++++++++++++ .../query/calcite/prepare/IgniteSqlValidator.java | 55 +++ .../processors/query/calcite/prepare/Query.java | 53 +++ .../query/calcite/prepare/QueryExecution.java | 28 ++ .../processors/query/calcite/rel/IgniteRel.java | 38 +++ .../query/calcite/rel/IgniteVisitor.java | 25 ++ .../calcite/rel/logical/IgniteLogicalFilter.java | 53 +++ .../calcite/rel/logical/IgniteLogicalJoin.java | 63 ++++ .../calcite/rel/logical/IgniteLogicalProject.java | 42 +++ .../rel/logical/IgniteLogicalTableScan.java | 37 ++ .../processors/query/calcite/rule/IgniteRules.java | 175 ++++++++++ .../query/calcite/rule/PlannerPhase.java | 49 +++ .../processors/query/calcite/rule/PlannerType.java | 26 ++ .../calcite/rule/logical/IgniteFilterRule.java | 56 +++ .../query/calcite/rule/logical/IgniteJoinRule.java | 51 +++ .../calcite/rule/logical/IgniteProjectRule.java | 62 ++++ .../schema/CalciteSchemaChangeListener.java | 64 ++++ .../query/calcite/schema/CalciteSchemaHolder.java | 39 +++ .../query/calcite/schema/IgniteSchema.java | 87 +++++ .../query/calcite/schema/IgniteTable.java | 87 +++++ .../query/calcite/schema/SchemaProvider.java | 27 ++ .../query/calcite/schema/TableDescriptor.java | 27 ++ .../processors/query/calcite/util/Commons.java | 59 ++++ .../query/calcite/util/IgniteMethod.java | 40 +++ .../query/calcite/util/ListFieldsQueryCursor.java | 92 +++++ .../query/calcite/util/ScanIterator.java | 149 ++++++++ .../query/calcite/CalciteQueryProcessorTest.java | 179 ++++++++++ .../ignite/testsuites/IgniteCalciteTestSuite.java | 32 ++ .../ignite/internal/IgniteComponentType.java | 8 + .../processors/query/GridQueryProcessor.java | 31 +- .../processors/query/GridQueryTypeDescriptor.java | 5 + .../internal/processors/query/QueryContext.java | 73 ++++ .../internal/processors/query/QueryEngine.java | 61 ++++ .../processors/query/QueryTypeDescriptorImpl.java | 6 +- .../query/schema/SchemaChangeListener.java | 35 ++ .../GridInternalSubscriptionProcessor.java | 16 + .../processors/query/h2/SchemaManager.java | 67 +++- parent/pom.xml | 1 + pom.xml | 1 + 44 files changed, 2859 insertions(+), 7 deletions(-) diff --git a/modules/calcite/README.txt b/modules/calcite/README.txt new file mode 100644 index 0000000..e144a77 --- /dev/null +++ b/modules/calcite/README.txt @@ -0,0 +1,35 @@ +Apache Ignite Calcite Module +-------------------------- + +Apache Ignite Calcite module provides experimental Apache Calcite based query engine. + +To enable Calcite module when starting a standalone node, move 'optional/ignite-calcite' folder to +'libs' folder before running 'ignite.{sh|bat}' script. The content of the module folder will +be added to classpath in this case. + +Note: At now some logic from ignite-indexing module is reused, this means ignite-indexing module also +has to be present at classpath. + +Importing Calcite Module In Maven Project +--------------------------------------- + +If you are using Maven to manage dependencies of your project, you can add Calcite module +dependency like this (replace '${ignite.version}' with actual Apache Ignite version you are +interested in): + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-calcite</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml new file mode 100644 index 0000000..1fa26fa --- /dev/null +++ b/modules/calcite/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-calcite</artifactId> + <version>2.8.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- + At now the new calcite engine reuses some logic + and doesn't work without "old" indexing module. + --> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>${calcite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Generate the OSGi MANIFEST.MF for this bundle. --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java new file mode 100644 index 0000000..737f19b --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite; + +import java.util.Collections; +import java.util.List; +import org.apache.calcite.config.Lex; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.fun.SqlLibrary; +import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.QueryContext; +import org.apache.ignite.internal.processors.query.QueryEngine; +import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution; +import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; +import org.apache.ignite.internal.processors.query.calcite.prepare.Query; +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution; +import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaChangeListener; +import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CalciteQueryProcessor implements QueryEngine { + /** */ + private final CalciteSchemaHolder schemaHolder = new CalciteSchemaHolder(); + + /** */ + private final FrameworkConfig config; + + /** */ + private IgniteLogger log; + + /** */ + private GridKernalContext ctx; + + public CalciteQueryProcessor() { + config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + // Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower + // case when they are read, and whether identifiers are matched case-sensitively. + .setLex(Lex.MYSQL) + .build()) + // Dialects support. + .operatorTable(SqlLibraryOperatorTableFactory.INSTANCE + .getOperatorTable( + SqlLibrary.STANDARD, + SqlLibrary.MYSQL)) + // Context provides a way to store data within the planner session that can be accessed in planner rules. + .context(Contexts.of(this)) + // Custom cost factory to use during optimization + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + } + + /** + * @param log Logger. + */ + @LoggerResource + public void setLogger(IgniteLogger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void start(@NotNull GridKernalContext ctx) { + this.ctx = ctx; + + GridInternalSubscriptionProcessor prc = ctx.internalSubscriptionProcessor(); + + if (prc != null) // Stubbed context doesn't have such processor + prc.registerSchemaChangeListener(new CalciteSchemaChangeListener(schemaHolder)); + } + + /** {@inheritDoc} */ + @Override public void stop() { + } + + @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String query, Object... params) throws IgniteSQLException { + Context context = context(Commons.convert(ctx), query, params); + QueryExecution execution = prepare(context); + FieldsQueryCursor<List<?>> cur = execution.execute(); + return Collections.singletonList(cur); + } + + public FrameworkConfig config() { + return config; + } + + public IgniteLogger log() { + return log; + } + + public GridKernalContext context() { + return ctx; + } + + /** */ + public IgnitePlanner planner(RelTraitDef[] traitDefs, Context ctx) { + FrameworkConfig cfg = Frameworks.newConfigBuilder(config()) + .defaultSchema(ctx.unwrap(SchemaPlus.class)) + .traitDefs(traitDefs) + .context(ctx) + .build(); + + return new IgnitePlanner(cfg); + } + + private QueryExecution prepare(Context ctx) { + return new DistributedExecution(ctx); + } + + /** + * @param ctx External context. + * @param query Query string. + * @param params Query parameters. + * @return Query execution context. + */ + Context context(@NotNull Context ctx, String query, Object[] params) { // Package private visibility for tests. + return Contexts.chain( + config.getContext(), + Contexts.of(schemaHolder.schema(), new Query(query, params)), + ctx); + } + + /** + * @return Schema provider. + */ + CalciteSchemaHolder schemaHolder() { // Package private visibility for tests. + return schemaHolder; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java new file mode 100644 index 0000000..49b57ad --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import java.util.Map; +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.plan.Context; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; + +/** + * + */ +class DataContextImpl implements DataContext { + /** */ + private final JavaTypeFactoryImpl typeFactory; + + /** */ + private final SchemaPlus schema; + + /** */ + private final QueryProvider queryProvider; + + /** */ + private final Map<String, Object> params; + + /** + * @param params Parameters. + * @param ctx Query context. + */ + DataContextImpl(Map<String, Object> params, Context ctx) { + typeFactory = new JavaTypeFactoryImpl(ctx.unwrap(CalciteQueryProcessor.class).config().getTypeSystem()); + schema = ctx.unwrap(SchemaPlus.class); + queryProvider = ctx.unwrap(QueryProvider.class); + this.params = params; + } + + /** {@inheritDoc} */ + @Override public SchemaPlus getRootSchema() { + return schema; + } + + /** {@inheritDoc} */ + @Override public JavaTypeFactory getTypeFactory() { + return typeFactory; + } + + /** {@inheritDoc} */ + @Override public QueryProvider getQueryProvider() { + return queryProvider; + } + + /** {@inheritDoc} */ + @Override public Object get(String name) { + return params.get(name); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java new file mode 100644 index 0000000..54ac726 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.ValidationException; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; +import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; +import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor; + +/** + * + */ +public class DistributedExecution implements QueryExecution { + /** */ + private final Context ctx; + + /** + * @param ctx Query context. + */ + public DistributedExecution(Context ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public FieldsQueryCursor<List<?>> execute() { + CalciteQueryProcessor proc = Objects.requireNonNull(ctx.unwrap(CalciteQueryProcessor.class)); + Query query = Objects.requireNonNull(ctx.unwrap(Query.class)); + + RelTraitDef[] traitDefs = { + RelDistributionTraitDef.INSTANCE, + ConventionTraitDef.INSTANCE, + RelCollationTraitDef.INSTANCE + }; + + RelRoot relRoot; + + try (IgnitePlanner planner = proc.planner(traitDefs, ctx)) { + // Parse + SqlNode sqlNode = planner.parse(query.sql()); + + // Validate + sqlNode = planner.validate(sqlNode); + + // Convert to Relational operators graph + relRoot = planner.rel(sqlNode); + + RelNode rel = relRoot.rel; + + // Transformation chain + rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet()); + + RelTraitSet desired = rel.getTraitSet() + .replace(relRoot.collation) + .replace(IgniteRel.LOGICAL_CONVENTION) + .replace(RelDistributions.ANY) + .simplify(); + + rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired); + + relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind()); + } catch (SqlParseException | ValidationException e) { + String msg = "Failed to parse query."; + + proc.log().error(msg, e); + + throw new IgniteSQLException(msg, IgniteQueryErrorCode.PARSING, e); + } catch (Exception e) { + String msg = "Failed to create query execution graph."; + + proc.log().error(msg, e); + + throw new IgniteSQLException(msg, IgniteQueryErrorCode.UNKNOWN, e); + } + + // TODO physical plan. + + return new ListFieldsQueryCursor<>(relRoot.rel.getRowType(), Linq4j.emptyEnumerable(), Arrays::asList); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java new file mode 100644 index 0000000..b9bb2ed --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import com.google.common.collect.ImmutableList; +import java.io.Reader; +import java.util.List; +import java.util.Properties; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCostImpl; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.core.TableFunctionScan; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexExecutor; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.RelDecorrelator; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.Program; +import org.apache.calcite.tools.Programs; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RuleSet; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Pair; +import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; +import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; + +/** + * + */ +public class IgnitePlanner implements Planner, RelOptTable.ViewExpander { + private final SqlOperatorTable operatorTable; + private final ImmutableList<Program> programs; + private final FrameworkConfig frameworkConfig; + private final Context context; + private final CalciteConnectionConfig connectionConfig; + private final ImmutableList<RelTraitDef> traitDefs; + private final SqlParser.Config parserConfig; + private final SqlToRelConverter.Config sqlToRelConverterConfig; + private final SqlRexConvertletTable convertletTable; + private final RexExecutor executor; + private final SchemaPlus defaultSchema; + private final JavaTypeFactory typeFactory; + private final RelMetadataProvider metadataProvider; + + private boolean open; + + private RelOptPlanner planner; + private SqlValidator validator; + + /** + * @param config Framework config. + */ + public IgnitePlanner(FrameworkConfig config) { + frameworkConfig = config; + defaultSchema = config.getDefaultSchema(); + operatorTable = config.getOperatorTable(); + programs = config.getPrograms(); + parserConfig = config.getParserConfig(); + sqlToRelConverterConfig = config.getSqlToRelConverterConfig(); + traitDefs = config.getTraitDefs(); + convertletTable = config.getConvertletTable(); + executor = config.getExecutor(); + context = config.getContext(); + connectionConfig = connConfig(); + metadataProvider = DefaultRelMetadataProvider.INSTANCE; // TODO: right costs + + RelDataTypeSystem typeSystem = connectionConfig + .typeSystem(RelDataTypeSystem.class, RelDataTypeSystem.DEFAULT); + + typeFactory = new JavaTypeFactoryImpl(typeSystem); + } + + private CalciteConnectionConfig connConfig() { + CalciteConnectionConfig unwrapped = context.unwrap(CalciteConnectionConfig.class); + if (unwrapped != null) + return unwrapped; + + Properties properties = new Properties(); + properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), + String.valueOf(parserConfig.caseSensitive())); + properties.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), + String.valueOf(frameworkConfig.getParserConfig().conformance())); + return new CalciteConnectionConfigImpl(properties); + } + + /** {@inheritDoc} */ + @Override public RelTraitSet getEmptyTraitSet() { + return planner.emptyTraitSet(); + } + + /** {@inheritDoc} */ + @Override public void close() { + reset(); + } + + /** {@inheritDoc} */ + @Override public void reset() { + planner = null; + validator = null; + + open = false; + } + + private void ready() { + if (!open) { + planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), context); + planner.setExecutor(executor); + + validator = new IgniteSqlValidator(operatorTable, createCatalogReader(), typeFactory, conformance()); + validator.setIdentifierExpansion(true); + + for (RelTraitDef def : traitDefs) { + planner.addRelTraitDef(def); + } + + open = true; + } + } + + /** {@inheritDoc} */ + @Override public SqlNode parse(Reader reader) throws SqlParseException { + return SqlParser.create(reader, parserConfig).parseStmt(); + } + + /** {@inheritDoc} */ + @Override public SqlNode validate(SqlNode sqlNode) throws ValidationException { + ready(); + + try { + return validator.validate(sqlNode); + } + catch (RuntimeException e) { + throw new ValidationException(e); + } + } + + /** {@inheritDoc} */ + @Override public Pair<SqlNode, RelDataType> validateAndGetType(SqlNode sqlNode) throws ValidationException { + ready(); + + SqlNode validatedNode = validate(sqlNode); + RelDataType type = validator.getValidatedNodeType(validatedNode); + return Pair.of(validatedNode, type); + } + + /** {@inheritDoc} */ + @Override public RelNode convert(SqlNode sql) { + return rel(sql).rel; + } + + /** {@inheritDoc} */ + @Override public RelRoot rel(SqlNode sql) { + ready(); + + RexBuilder rexBuilder = createRexBuilder(); + RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + SqlToRelConverter.Config config = SqlToRelConverter.configBuilder() + .withConfig(sqlToRelConverterConfig) + .withTrimUnusedFields(false) + .withConvertTableAccess(false) + .build(); + SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter(this, validator, createCatalogReader(), cluster, convertletTable, config); + RelRoot root = sqlToRelConverter.convertQuery(sql, false, true); + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); + RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, null); + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder)); + return root; + } + + /** {@inheritDoc} */ + @Override public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) { + ready(); + + SqlParser parser = SqlParser.create(queryString, parserConfig); + SqlNode sqlNode; + try { + sqlNode = parser.parseQuery(); + } + catch (SqlParseException e) { + throw new RuntimeException("parse failed", e); + } + + SqlConformance conformance = conformance(); + CalciteCatalogReader catalogReader = + createCatalogReader().withSchemaPath(schemaPath); + SqlValidator validator = new IgniteSqlValidator(operatorTable, catalogReader, typeFactory, conformance); + validator.setIdentifierExpansion(true); + + RexBuilder rexBuilder = createRexBuilder(); + RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + SqlToRelConverter.Config config = SqlToRelConverter + .configBuilder() + .withConfig(sqlToRelConverterConfig) + .withTrimUnusedFields(false) + .withConvertTableAccess(false) + .build(); + SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter(this, validator, + catalogReader, cluster, convertletTable, config); + + RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false); + RelRoot root2 = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); + RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, null); + return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder)); + } + + /** {@inheritDoc} */ + @Override public RelNode transform(int programIdx, RelTraitSet targetTraits, RelNode rel) { + ready(); + + RelTraitSet toTraits = targetTraits.simplify(); + + rel.accept(new MetaDataProviderModifier(metadataProvider)); + + return programs.get(programIdx).run(planner, rel, toTraits, ImmutableList.of(), ImmutableList.of()); + } + + public RelNode transform(PlannerType plannerType, PlannerPhase plannerPhase, RelNode input, RelTraitSet targetTraits) { + ready(); + + RelTraitSet toTraits = targetTraits.simplify(); + RuleSet rules = plannerPhase.getRules(context); + + input.accept(new MetaDataProviderModifier(metadataProvider)); + + RelNode output; + + switch (plannerType) { + case HEP: + final HepProgramBuilder programBuilder = new HepProgramBuilder(); + + for (RelOptRule rule : rules) { + programBuilder.addRuleInstance(rule); + } + + final HepPlanner hepPlanner = + new HepPlanner(programBuilder.build(), context, true, null, RelOptCostImpl.FACTORY); + + hepPlanner.setRoot(input); + + if (!input.getTraitSet().equals(targetTraits)) + hepPlanner.changeTraits(input, toTraits); + + output = hepPlanner.findBestExp(); + + break; + case VOLCANO: + Program program = Programs.of(rules); + + output = program.run(planner, input, toTraits, + ImmutableList.of(), ImmutableList.of()); + + break; + default: + throw new AssertionError("Unknown planner type: " + plannerType); + } + + return output; + } + + /** {@inheritDoc} */ + @Override public JavaTypeFactory getTypeFactory() { + return typeFactory; + } + + private SqlConformance conformance() { + return connectionConfig.conformance(); + } + + private RexBuilder createRexBuilder() { + return new RexBuilder(typeFactory); + } + + private CalciteCatalogReader createCatalogReader() { + SchemaPlus rootSchema = rootSchema(defaultSchema); + + return new CalciteCatalogReader( + CalciteSchema.from(rootSchema), + CalciteSchema.from(defaultSchema).path(null), + typeFactory, connectionConfig); + } + + private static SchemaPlus rootSchema(SchemaPlus schema) { + for (; ; ) { + if (schema.getParentSchema() == null) { + return schema; + } + schema = schema.getParentSchema(); + } + } + + /** */ + private static class MetaDataProviderModifier extends RelShuttleImpl { + /** */ + private final RelMetadataProvider metadataProvider; + + /** */ + private MetaDataProviderModifier(RelMetadataProvider metadataProvider) { + this.metadataProvider = metadataProvider; + } + + /** {@inheritDoc} */ + @Override public RelNode visit(TableScan scan) { + scan.getCluster().setMetadataProvider(metadataProvider); + return super.visit(scan); + } + + /** {@inheritDoc} */ + @Override public RelNode visit(TableFunctionScan scan) { + scan.getCluster().setMetadataProvider(metadataProvider); + return super.visit(scan); + } + + /** {@inheritDoc} */ + @Override public RelNode visit(LogicalValues values) { + values.getCluster().setMetadataProvider(metadataProvider); + return super.visit(values); + } + + /** {@inheritDoc} */ + @Override protected RelNode visitChild(RelNode parent, int i, RelNode child) { + child.accept(this); + parent.getCluster().setMetadataProvider(metadataProvider); + return parent; + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java new file mode 100644 index 0000000..74bd1df --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +/** + * + */ + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + +/** Validator. */ +public class IgniteSqlValidator extends SqlValidatorImpl { + public IgniteSqlValidator(SqlOperatorTable opTab, + CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, + SqlConformance conformance) { + super(opTab, catalogReader, typeFactory, conformance); + } + + /** {@inheritDoc} */ + @Override protected RelDataType getLogicalSourceRowType( + RelDataType sourceRowType, SqlInsert insert) { + final RelDataType superType = + super.getLogicalSourceRowType(sourceRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } + + /** {@inheritDoc} */ + @Override protected RelDataType getLogicalTargetRowType( + RelDataType targetRowType, SqlInsert insert) { + final RelDataType superType = + super.getLogicalTargetRowType(targetRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java new file mode 100644 index 0000000..92b974e --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Query.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.F; + +/** + * + */ +public class Query { + private final String sql; + private final Object[] params; + + public Query(String sql, Object[] params) { + this.sql = sql; + this.params = params; + } + + public String sql() { + return sql; + } + + public Object[] params() { + return params; + } + + public Map<String, Object> params(Map<String, Object> stashed) { + Map<String, Object> res = new HashMap<>(stashed); + if (!F.isEmpty(params)) { + for (int i = 0; i < params.length; i++) { + res.put("?" + i, params[i]); + } + } + return res; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryExecution.java new file mode 100644 index 0000000..bcfed58 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryExecution.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import java.util.List; +import org.apache.ignite.cache.query.FieldsQueryCursor; + +/** + * + */ +public interface QueryExecution { + FieldsQueryCursor<List<?>> execute(); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java new file mode 100644 index 0000000..995ad70 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; + +/** + * + */ +public interface IgniteRel extends RelNode { + Convention LOGICAL_CONVENTION = new Convention.Impl("IGNITE_LOGICAL", IgniteRel.class) { + /** */ + @Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return true; // Enables trait definition conversion + } + }; + + default void visit(IgniteVisitor visitor) { + visitor.visit(this); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java new file mode 100644 index 0000000..a89c034 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteVisitor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +/** + * + */ +public interface IgniteVisitor { + public void visit(IgniteRel rel); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java new file mode 100644 index 0000000..638cfb1 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite.rel.logical; + +import java.util.Objects; +import java.util.Set; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; + +public final class IgniteLogicalFilter extends Filter implements IgniteRel { + private final Set<CorrelationId> variablesSet; + + public IgniteLogicalFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, + RexNode condition, Set<CorrelationId> variablesSet) { + super(cluster, traitSet, child, condition); + this.variablesSet = Objects.requireNonNull(variablesSet); + } + + @Override public Set<CorrelationId> getVariablesSet() { + return variablesSet; + } + + @Override public IgniteLogicalFilter copy(RelTraitSet traitSet, RelNode input, + RexNode condition) { + return new IgniteLogicalFilter(getCluster(), traitSet, input, condition, + variablesSet); + } + + @Override public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty()); + } +} \ No newline at end of file diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java new file mode 100644 index 0000000..5abfc32 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalJoin.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite.rel.logical; + +import java.util.Set; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; + +public final class IgniteLogicalJoin extends Join implements IgniteRel { + private final boolean semiJoinDone; + + public IgniteLogicalJoin( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode left, + RelNode right, + RexNode condition, + Set<CorrelationId> variablesSet, + JoinRelType joinType, + boolean semiJoinDone) { + super(cluster, traitSet, left, right, condition, variablesSet, joinType); + this.semiJoinDone = semiJoinDone; + } + + @Override public IgniteLogicalJoin copy(RelTraitSet traitSet, RexNode conditionExpr, + RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new IgniteLogicalJoin(getCluster(), + getCluster().traitSetOf(IgniteRel.LOGICAL_CONVENTION), left, right, conditionExpr, + variablesSet, joinType, semiJoinDone); + } + + @Override public RelWriter explainTerms(RelWriter pw) { + // Don't ever print semiJoinDone=false. This way, we + // don't clutter things up in optimizers that don't use semi-joins. + return super.explainTerms(pw) + .itemIf("semiJoinDone", semiJoinDone, semiJoinDone); + } + + @Override public boolean isSemiJoinDone() { + return semiJoinDone; + } +} \ No newline at end of file diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java new file mode 100644 index 0000000..6f467a9 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite.rel.logical; + +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; + +public final class IgniteLogicalProject extends Project implements IgniteRel { + public IgniteLogicalProject( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<? extends RexNode> projects, + RelDataType rowType) { + super(cluster, traitSet, input, projects, rowType); + } + + @Override public IgniteLogicalProject copy(RelTraitSet traitSet, RelNode input, + List<RexNode> projects, RelDataType rowType) { + return new IgniteLogicalProject(getCluster(), traitSet, input, projects, rowType); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java new file mode 100644 index 0000000..664e2db --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite.rel.logical; + +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; + +public final class IgniteLogicalTableScan extends TableScan implements IgniteRel { + public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, + RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java new file mode 100644 index 0000000..477b747 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.volcano.AbstractConverter; +import org.apache.calcite.rel.rules.AbstractMaterializedViewRule; +import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule; +import org.apache.calcite.rel.rules.AggregateJoinTransposeRule; +import org.apache.calcite.rel.rules.AggregateMergeRule; +import org.apache.calcite.rel.rules.AggregateProjectMergeRule; +import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule; +import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule; +import org.apache.calcite.rel.rules.AggregateRemoveRule; +import org.apache.calcite.rel.rules.AggregateStarTableRule; +import org.apache.calcite.rel.rules.AggregateValuesRule; +import org.apache.calcite.rel.rules.CalcRemoveRule; +import org.apache.calcite.rel.rules.DateRangeRules; +import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule; +import org.apache.calcite.rel.rules.FilterAggregateTransposeRule; +import org.apache.calcite.rel.rules.FilterJoinRule; +import org.apache.calcite.rel.rules.FilterMergeRule; +import org.apache.calcite.rel.rules.FilterProjectTransposeRule; +import org.apache.calcite.rel.rules.FilterTableScanRule; +import org.apache.calcite.rel.rules.IntersectToDistinctRule; +import org.apache.calcite.rel.rules.JoinCommuteRule; +import org.apache.calcite.rel.rules.JoinPushExpressionsRule; +import org.apache.calcite.rel.rules.JoinPushThroughJoinRule; +import org.apache.calcite.rel.rules.MaterializedViewFilterScanRule; +import org.apache.calcite.rel.rules.ProjectFilterTransposeRule; +import org.apache.calcite.rel.rules.ProjectMergeRule; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.rules.ProjectToWindowRule; +import org.apache.calcite.rel.rules.ProjectWindowTransposeRule; +import org.apache.calcite.rel.rules.PruneEmptyRules; +import org.apache.calcite.rel.rules.ReduceExpressionsRule; +import org.apache.calcite.rel.rules.SemiJoinRule; +import org.apache.calcite.rel.rules.SortJoinTransposeRule; +import org.apache.calcite.rel.rules.SortProjectTransposeRule; +import org.apache.calcite.rel.rules.SortRemoveConstantKeysRule; +import org.apache.calcite.rel.rules.SortRemoveRule; +import org.apache.calcite.rel.rules.SortUnionTransposeRule; +import org.apache.calcite.rel.rules.SubQueryRemoveRule; +import org.apache.calcite.rel.rules.TableScanRule; +import org.apache.calcite.rel.rules.UnionMergeRule; +import org.apache.calcite.rel.rules.UnionPullUpConstantsRule; +import org.apache.calcite.rel.rules.UnionToDistinctRule; +import org.apache.calcite.rel.rules.ValuesReduceRule; +import org.apache.ignite.internal.processors.query.calcite.rule.logical.IgniteFilterRule; +import org.apache.ignite.internal.processors.query.calcite.rule.logical.IgniteJoinRule; +import org.apache.ignite.internal.processors.query.calcite.rule.logical.IgniteProjectRule; + +/** + * + */ +public class IgniteRules { + public static final List<RelOptRule> BASE_RULES = ImmutableList.of( + AggregateStarTableRule.INSTANCE, + AggregateStarTableRule.INSTANCE2, + TableScanRule.INSTANCE, + ProjectMergeRule.INSTANCE, + FilterTableScanRule.INSTANCE, + ProjectFilterTransposeRule.INSTANCE, + FilterProjectTransposeRule.INSTANCE, + FilterJoinRule.FILTER_ON_JOIN, + JoinPushExpressionsRule.INSTANCE, + AggregateExpandDistinctAggregatesRule.INSTANCE, + AggregateReduceFunctionsRule.INSTANCE, + FilterAggregateTransposeRule.INSTANCE, + ProjectWindowTransposeRule.INSTANCE, + JoinCommuteRule.INSTANCE, + JoinPushThroughJoinRule.RIGHT, + JoinPushThroughJoinRule.LEFT, + SortProjectTransposeRule.INSTANCE, + SortJoinTransposeRule.INSTANCE, + SortRemoveConstantKeysRule.INSTANCE, + SortUnionTransposeRule.INSTANCE, + ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE, + ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE); + + public static final List<RelOptRule> ABSTRACT_RULES = ImmutableList.of( + AggregateProjectPullUpConstantsRule.INSTANCE2, + UnionPullUpConstantsRule.INSTANCE, + PruneEmptyRules.UNION_INSTANCE, + PruneEmptyRules.INTERSECT_INSTANCE, + PruneEmptyRules.MINUS_INSTANCE, + PruneEmptyRules.PROJECT_INSTANCE, + PruneEmptyRules.FILTER_INSTANCE, + PruneEmptyRules.SORT_INSTANCE, + PruneEmptyRules.AGGREGATE_INSTANCE, + PruneEmptyRules.JOIN_LEFT_INSTANCE, + PruneEmptyRules.JOIN_RIGHT_INSTANCE, + PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE, + UnionMergeRule.INSTANCE, + UnionMergeRule.INTERSECT_INSTANCE, + UnionMergeRule.MINUS_INSTANCE, + ProjectToWindowRule.PROJECT, + FilterMergeRule.INSTANCE, + DateRangeRules.FILTER_INSTANCE, + IntersectToDistinctRule.INSTANCE); + + public static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES = ImmutableList.of( + FilterJoinRule.FILTER_ON_JOIN, + FilterJoinRule.JOIN, + AbstractConverter.ExpandConversionRule.INSTANCE, + JoinCommuteRule.INSTANCE, + SemiJoinRule.PROJECT, + SemiJoinRule.JOIN, + AggregateRemoveRule.INSTANCE, + UnionToDistinctRule.INSTANCE, + ProjectRemoveRule.INSTANCE, + AggregateJoinTransposeRule.INSTANCE, + AggregateMergeRule.INSTANCE, + AggregateProjectMergeRule.INSTANCE, + CalcRemoveRule.INSTANCE, + SortRemoveRule.INSTANCE); + + public static final List<RelOptRule> CONSTANT_REDUCTION_RULES = ImmutableList.of( + ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE, + ReduceExpressionsRule.WINDOW_INSTANCE, + ReduceExpressionsRule.JOIN_INSTANCE, + ValuesReduceRule.FILTER_INSTANCE, + ValuesReduceRule.PROJECT_FILTER_INSTANCE, + ValuesReduceRule.PROJECT_INSTANCE, + AggregateValuesRule.INSTANCE); + + public static final List<RelOptRule> MATERIALIZATION_RULES = ImmutableList.of( + MaterializedViewFilterScanRule.INSTANCE, + AbstractMaterializedViewRule.INSTANCE_PROJECT_FILTER, + AbstractMaterializedViewRule.INSTANCE_FILTER, + AbstractMaterializedViewRule.INSTANCE_PROJECT_JOIN, + AbstractMaterializedViewRule.INSTANCE_JOIN, + AbstractMaterializedViewRule.INSTANCE_PROJECT_AGGREGATE, + AbstractMaterializedViewRule.INSTANCE_AGGREGATE); + + public static final List<RelOptRule> SUBQUERY_REWRITE_RULES = ImmutableList.of( + SubQueryRemoveRule.FILTER, + SubQueryRemoveRule.PROJECT, + SubQueryRemoveRule.JOIN); + + public static final List<RelOptRule> IGNITE_BASE_RULES = ImmutableList.of( + IgniteFilterRule.INSTANCE, + IgniteProjectRule.INSTANCE, + IgniteJoinRule.INSTANCE + ); + + public static List<RelOptRule> logicalRules(Context ctx) { + return ImmutableList.<RelOptRule>builder() + .addAll(BASE_RULES) + .addAll(ABSTRACT_RULES) + .addAll(ABSTRACT_RELATIONAL_RULES) + .addAll(IGNITE_BASE_RULES) + .build(); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java new file mode 100644 index 0000000..aa82187 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import org.apache.calcite.plan.Context; +import org.apache.calcite.tools.RuleSet; +import org.apache.calcite.tools.RuleSets; + +/** + * + */ +public enum PlannerPhase { + /** */ + SUBQUERY_REWRITE("Sub-queries rewrites") { + @Override public RuleSet getRules(Context ctx) { + return RuleSets.ofList(IgniteRules.SUBQUERY_REWRITE_RULES); + } + }, + + /** */ + LOGICAL("Logical planning") { + @Override public RuleSet getRules(Context ctx) { + return RuleSets.ofList(IgniteRules.logicalRules(ctx)); + } + }; + + public final String description; + + PlannerPhase(String description) { + this.description = description; + } + + public abstract RuleSet getRules(Context ctx); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java new file mode 100644 index 0000000..a4a8db8 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +/** + * + */ +public enum PlannerType { + HEP, + VOLCANO; +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteFilterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteFilterRule.java new file mode 100644 index 0000000..bdda595 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteFilterRule.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule.logical; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalFilter; + +/** + * + */ +public class IgniteFilterRule extends RelOptRule { + public static final RelOptRule INSTANCE = new IgniteFilterRule(); + + private IgniteFilterRule() { + super(operand(LogicalFilter.class, Convention.NONE, any()), RelFactories.LOGICAL_BUILDER, "IgniteFilterRule"); + } + + @Override public void onMatch(RelOptRuleCall call) { + LogicalFilter filter = call.rel(0); + final RelNode input = filter.getInput(); + final RelOptCluster cluster = input.getCluster(); + final RelMetadataQuery mq = cluster.getMetadataQuery(); + final RelTraitSet traitSet = cluster.traitSet() + .replace(IgniteRel.LOGICAL_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + () -> RelMdCollation.filter(mq, input)); + RelNode convertedInput = convert(input, traitSet); + call.transformTo(new IgniteLogicalFilter(cluster, traitSet, convertedInput, filter.getCondition(), filter.getVariablesSet())); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java new file mode 100644 index 0000000..44009a8 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule.logical; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin; + +/** + * + */ +public class IgniteJoinRule extends RelOptRule { + public static final RelOptRule INSTANCE = new IgniteJoinRule(); + + public IgniteJoinRule() { + super(operand(LogicalJoin.class, Convention.NONE, any()), RelFactories.LOGICAL_BUILDER, "IgniteJoinRule"); + } + + @Override public void onMatch(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + RelOptCluster cluster = join.getCluster(); + RelTraitSet traitSet = cluster.traitSet() + .replace(IgniteRel.LOGICAL_CONVENTION); + RelNode left = convert(join.getLeft(), traitSet); + RelNode right = convert(join.getRight(), traitSet); + call.transformTo(new IgniteLogicalJoin(cluster, traitSet, left, right, + join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone())); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteProjectRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteProjectRule.java new file mode 100644 index 0000000..043085f --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteProjectRule.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule.logical; + +import java.util.List; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalProject; + +/** + * + */ +public class IgniteProjectRule extends RelOptRule { + public static final RelOptRule INSTANCE = new IgniteProjectRule(); + + private <R extends RelNode> IgniteProjectRule() { + super(operand(LogicalProject.class, Convention.NONE, any()), RelFactories.LOGICAL_BUILDER, "IgniteProjectRule"); + } + + @Override public void onMatch(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + final RelNode input = project.getInput(); + final RelOptCluster cluster = input.getCluster(); + final List<? extends RexNode> projects = project.getProjects(); + final RelMetadataQuery mq = cluster.getMetadataQuery(); + final RelTraitSet traitSet = + cluster.traitSet() + .replace(IgniteRel.LOGICAL_CONVENTION) +// .replaceIf(RelDistributionTraitDef.INSTANCE, +// () -> RelMdDistribution.project(mq, input, projects)) + .replaceIfs(RelCollationTraitDef.INSTANCE, + () -> RelMdCollation.project(mq, input, projects)); + RelNode convertedInput = convert(input, traitSet); + call.transformTo(new IgniteLogicalProject(cluster, traitSet, convertedInput, projects, project.getRowType())); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java new file mode 100644 index 0000000..67cca7c --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +import java.util.HashMap; +import java.util.Map; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener; + +/** + * + */ +public class CalciteSchemaChangeListener implements SchemaChangeListener { + private final Map<String, IgniteSchema> schemas = new HashMap<>(); + private final CalciteSchemaHolder schemaHolder; + + public CalciteSchemaChangeListener(CalciteSchemaHolder schemaHolder) { + this.schemaHolder = schemaHolder; + } + + @Override public synchronized void onSchemaCreate(String schemaName) { + schemas.putIfAbsent(schemaName, new IgniteSchema(schemaName)); + rebuild(); + } + + @Override public synchronized void onSchemaDrop(String schemaName) { + schemas.remove(schemaName); + rebuild(); + } + + @Override public synchronized void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { + schemas.computeIfAbsent(schemaName, IgniteSchema::new).onSqlTypeCreate(typeDescriptor, cacheInfo); + rebuild(); + } + + @Override public synchronized void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { + schemas.computeIfAbsent(schemaName, IgniteSchema::new).onSqlTypeDrop(typeDescriptor, cacheInfo); + rebuild(); + } + + public void rebuild() { + SchemaPlus schema = Frameworks.createRootSchema(false); + schemas.forEach(schema::add); + schemaHolder.schema(schema); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java new file mode 100644 index 0000000..3978ae5 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +import java.util.Objects; +import org.apache.calcite.schema.SchemaPlus; + +/** + * + */ +public class CalciteSchemaHolder implements SchemaProvider { + private volatile SchemaPlus schema; + + @Override public SchemaPlus schema() { + return Objects.requireNonNull(schema); + } + + /** + * @param schema Calcite schema. + */ + public void schema(SchemaPlus schema) { + this.schema = schema; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java new file mode 100644 index 0000000..bf95938 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; + +/** + * + */ +public class IgniteSchema extends AbstractSchema { + /** */ + private final String schemaName; + + /** */ + private final Map<String, Table> tableMap = new ConcurrentHashMap<>(); + + public IgniteSchema(String schemaName) { + this.schemaName = schemaName; + } + + public String getName() { + return schemaName; + } + + @Override protected Map<String, Table> getTableMap() { + return Collections.unmodifiableMap(tableMap); + } + + /** + * Callback method. + * + * @param typeDesc Query type descriptor. + * @param cacheInfo Cache info. + */ + public void onSqlTypeCreate(GridQueryTypeDescriptor typeDesc, GridCacheContextInfo cacheInfo) { + IgniteTable table = new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowTypeFunction(typeDesc), null); + + addTable(table.tableName(), table); + } + + /** + * Callback method. + * + * @param typeDesc Query type descriptor. + * @param cacheInfo Cache info. + */ + public void onSqlTypeDrop(GridQueryTypeDescriptor typeDesc, GridCacheContextInfo cacheInfo) { + removeTable(typeDesc.tableName()); + } + + /** + * @param name Table name. + * @param table Table. + */ + public void addTable(String name, Table table) { + tableMap.put(name, table); + } + + /** + * @param tableName Table name. + */ + public void removeTable(String tableName) { + tableMap.remove(tableName); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java new file mode 100644 index 0000000..0bd4e51 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +import java.util.function.Function; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; +import org.jetbrains.annotations.Nullable; + +/** */ +public class IgniteTable extends AbstractTable implements TranslatableTable { + private final String tableName; + private final String cacheName; + private final Function<RelDataTypeFactory, RelDataType> rowType; + private final Statistic statistic; + + + public IgniteTable(String tableName, String cacheName, + Function<RelDataTypeFactory, RelDataType> rowType, @Nullable Statistic statistic) { + this.tableName = tableName; + this.cacheName = cacheName; + this.rowType = rowType; + + this.statistic = statistic == null ? Statistics.UNKNOWN : statistic; + } + + /** + * @return Table name; + */ + public String tableName() { + return tableName; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public Statistic getStatistic() { + return statistic; + } + + /** {@inheritDoc} */ + @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return rowType.apply(typeFactory); + } + + /** {@inheritDoc} */ + @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + RelOptCluster cluster = context.getCluster(); + RelTraitSet traitSet = cluster.traitSet() + .replace(IgniteRel.LOGICAL_CONVENTION) + .replaceIf(RelDistributionTraitDef.INSTANCE, () -> getStatistic().getDistribution()) + .replaceIfs(RelCollationTraitDef.INSTANCE, () -> getStatistic().getCollations()); + return new IgniteLogicalTableScan(cluster, traitSet, relOptTable); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java new file mode 100644 index 0000000..5fdd311 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +import org.apache.calcite.schema.SchemaPlus; + +/** + * + */ +public interface SchemaProvider { + SchemaPlus schema(); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java new file mode 100644 index 0000000..8c6cc61 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +/** + * + */ +public interface TableDescriptor { + public boolean partitioned(); + + public boolean replicated(); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java new file mode 100644 index 0000000..8758eb2 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.util; + +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.QueryContext; +import org.apache.ignite.internal.processors.query.QueryUtils; + +/** + * + */ +public final class Commons { + private Commons(){} + + public static Context convert(QueryContext ctx) { + return ctx == null ? Contexts.empty() : Contexts.of(ctx.unwrap(Object[].class)); + } + + public static <T> Predicate<T> any() { + return obj -> true; + } + + /** */ + public static Function<RelDataTypeFactory, RelDataType> rowTypeFunction(GridQueryTypeDescriptor desc) { + return (f) -> { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); + + builder.add(QueryUtils.KEY_FIELD_NAME, f.createJavaType(desc.keyClass())); + builder.add(QueryUtils.VAL_FIELD_NAME, f.createJavaType(desc.valueClass())); + + for (Map.Entry<String, Class<?>> prop : desc.fields().entrySet()) { + builder.add(prop.getKey(), f.createJavaType(prop.getValue())); + } + return builder.build(); + }; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java new file mode 100644 index 0000000..1b88f00 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.util; + +import java.lang.reflect.Method; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; + +/** + * + */ +public enum IgniteMethod { + IGNITE_TABLE_ENUMERABLE(IgniteTable.class, "enumerable"); + + private final Method method; + + IgniteMethod(Class clazz, String methodName, Class... argumentTypes) { + method = Types.lookupMethod(clazz, methodName, argumentTypes); + } + + /** */ + public Method method() { + return method; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java new file mode 100644 index 0000000..c46e762 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.util; + +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; + +/** + * + */ +public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>> { + /** */ + private final RelDataType rowType; + + /** */ + private final Enumerable<T> enumerable; + + /** */ + private final Function<T, List<?>> converter; + + /** */ + private Iterator<T> it; + + /** + * @param rowType Row data type description. + * @param enumerable Rows source. + * @param converter Row converter. + */ + public ListFieldsQueryCursor(RelDataType rowType, Enumerable<T> enumerable, Function<T, List<?>> converter) { + this.rowType = rowType; + this.enumerable = enumerable; + this.converter = converter; + } + + /** {@inheritDoc} */ + @Override public String getFieldName(int idx) { + return rowType.getFieldList().get(idx).getName(); + } + + /** {@inheritDoc} */ + @Override public int getColumnsCount() { + return rowType.getFieldCount(); + } + + /** {@inheritDoc} */ + @Override public List<List<?>> getAll() { + return StreamSupport.stream(enumerable.spliterator(), false) + .map(converter) + .collect(Collectors.toList()); + } + + /** {@inheritDoc} */ + @Override public void close() { + closeIterator(); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<List<?>> iterator() { + closeIterator(); + + return F.iterator(it = enumerable.iterator(), converter::apply, true); + } + + private void closeIterator() { + if (it instanceof AutoCloseable) + U.closeQuiet((AutoCloseable)it); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java new file mode 100644 index 0000000..7d22d51 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.util; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCursor; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** + * + */ +public class ScanIterator<T> extends GridCloseableIteratorAdapter<T> { + private final int cacheId; + private final Iterator<GridDhtLocalPartition> parts; + private final Function<CacheDataRow, T> typeWrapper; + private final Predicate<CacheDataRow> typeFilter; + + /** + * + */ + private GridCursor<? extends CacheDataRow> cur; + /** + * + */ + private GridDhtLocalPartition curPart; + + /** + * + */ + private T next; + + public ScanIterator(int cacheId, Iterator<GridDhtLocalPartition> parts, Function<CacheDataRow, T> typeWrapper, + Predicate<CacheDataRow> typeFilter) { + this.cacheId = cacheId; + this.parts = parts; + this.typeWrapper = typeWrapper; + this.typeFilter = typeFilter; + } + + @Override + protected T onNext() { + if (next == null) + throw new NoSuchElementException(); + + T next = this.next; + + this.next = null; + + return next; + } + + @Override + protected boolean onHasNext() throws IgniteCheckedException { + if (next != null) + return true; + + while (true) { + if (cur == null) { + if (parts.hasNext()) { + GridDhtLocalPartition part = parts.next(); + + if (!reservePartition(part)) + throw new IgniteSQLException("Failed to reserve partition, please retry on stable topology."); + + IgniteCacheOffheapManager.CacheDataStore ds = part.dataStore(); + + cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); + } else + break; + } + + if (cur.next()) { + CacheDataRow row = cur.get(); + + if (!typeFilter.test(row)) + continue; + + next = typeWrapper.apply(row); + + break; + } else { + cur = null; + + releaseCurrentPartition(); + } + } + + return next != null; + } + + /** + * + */ + private void releaseCurrentPartition() { + GridDhtLocalPartition p = curPart; + + assert p != null; + + curPart = null; + + p.release(); + } + + /** + * + */ + private boolean reservePartition(GridDhtLocalPartition p) { + if (p != null && p.reserve()) { + curPart = p; + + return true; + } + + return false; + } + + /** + * {@inheritDoc} + */ + @Override protected void onClose() { + if (curPart != null) + releaseCurrentPartition(); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java new file mode 100644 index 0000000..76da7d3 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite; + + +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; +import org.apache.ignite.internal.processors.query.calcite.prepare.Query; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; +import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; +import org.apache.ignite.testframework.junits.GridTestKernalContext; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME; +import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME; + +/** + * + */ +@WithSystemProperty(key = "calcite.debug", value = "true") +public class CalciteQueryProcessorTest extends GridCommonAbstractTest { + + private static CalciteQueryProcessor proc; + + @BeforeClass + public static void setupClass() { + proc = new CalciteQueryProcessor(); + + proc.setLogger(log); + proc.start(new GridTestKernalContext(log)); + + IgniteSchema publicSchema = new IgniteSchema("PUBLIC"); + + publicSchema.addTable("Developer", new IgniteTable("Developer", "Developer", (f) -> { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); + + builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add("id", f.createJavaType(Integer.class)); + builder.add("name", f.createJavaType(String.class)); + builder.add("projectId", f.createJavaType(Integer.class)); + builder.add("cityId", f.createJavaType(Integer.class)); + + return builder.build(); + }, null)); + + publicSchema.addTable("Project", new IgniteTable("Project", "Project", (f) -> { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); + + builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add("id", f.createJavaType(Integer.class)); + builder.add("name", f.createJavaType(String.class)); + builder.add("ver", f.createJavaType(Integer.class)); + + return builder.build(); + }, null)); + + publicSchema.addTable("Country", new IgniteTable("Country", "Country", (f) -> { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); + + builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add("id", f.createJavaType(Integer.class)); + builder.add("name", f.createJavaType(String.class)); + builder.add("countryCode", f.createJavaType(Integer.class)); + + return builder.build(); + }, null)); + + publicSchema.addTable("City", new IgniteTable("City", "City", (f) -> { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); + + builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); + builder.add("id", f.createJavaType(Integer.class)); + builder.add("name", f.createJavaType(String.class)); + builder.add("countryId", f.createJavaType(Integer.class)); + + return builder.build(); + }, null)); + + SchemaPlus schema = Frameworks.createRootSchema(false); + + schema.add("PUBLIC", publicSchema); + + proc.schemaHolder().schema(schema); + } + + @Test + public void testLogicalPlan() throws Exception { + String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) > ?"; + + Context ctx = proc.context(Contexts.empty(), sql, new Object[]{2}); + + assertNotNull(ctx); + + RelTraitDef[] traitDefs = { + RelDistributionTraitDef.INSTANCE, + ConventionTraitDef.INSTANCE, + RelCollationTraitDef.INSTANCE + }; + + RelRoot relRoot; + + try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ + assertNotNull(planner); + + Query query = ctx.unwrap(Query.class); + + assertNotNull(planner); + + // Parse + SqlNode sqlNode = planner.parse(query.sql()); + + // Validate + sqlNode = planner.validate(sqlNode); + + // Convert to Relational operators graph + relRoot = planner.rel(sqlNode); + + RelNode rel = relRoot.rel; + + // Transformation chain + rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet()); + + RelTraitSet desired = rel.getTraitSet() + .replace(relRoot.collation) + .replace(IgniteRel.LOGICAL_CONVENTION) + .replace(RelDistributions.ANY) + .simplify(); + + rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired); + + relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind()); + } + + assertNotNull(relRoot); + } +} \ No newline at end of file diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java new file mode 100644 index 0000000..55a5925 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Calcite tests. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + CalciteQueryProcessorTest.class + , +}) +public class IgniteCalciteTestSuite { } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 5e65876..4b7a47e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import java.lang.reflect.Constructor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.processors.query.QueryEngine; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -97,6 +98,13 @@ public enum IgniteComponentType { CompressionProcessor.class.getName(), "org.apache.ignite.internal.processors.compress.CompressionProcessorImpl", "ignite-compress" + ), + + /** Experimental calcite based query engine. */ + QUERY_ENGINE( + QueryEngine.NoOpQueryEngine.class.getName(), + "org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor", + "ignite-calcite" ); /** No-op class name. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index dc64666..b5ced01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -118,6 +118,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA; import static org.apache.ignite.internal.IgniteComponentType.INDEXING; +import static org.apache.ignite.internal.IgniteComponentType.QUERY_ENGINE; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL; /** @@ -151,6 +152,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** */ private final @Nullable GridQueryIndexing idx; + /** */ + private final @Nullable QueryEngine qryEngine; + /** Value object context. */ private final CacheQueryObjectValueContext valCtx; @@ -210,13 +214,18 @@ public class GridQueryProcessor extends GridProcessorAdapter { public GridQueryProcessor(GridKernalContext ctx) throws IgniteCheckedException { super(ctx); + boolean inBoundIndexingEnabled = false; + if (idxCls != null) { idx = U.newInstance(idxCls); idxCls = null; } else - idx = INDEXING.inClassPath() ? U.<GridQueryIndexing>newInstance(INDEXING.className()) : null; + idx = (inBoundIndexingEnabled = INDEXING.inClassPath()) ? U.newInstance(INDEXING.className()) : null; + + // At now experimental engine uses some logic of old one and cannot work separately. + qryEngine = inBoundIndexingEnabled ? QUERY_ENGINE.createOptional() : null; valCtx = new CacheQueryObjectValueContext(ctx); @@ -239,6 +248,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public void start() throws IgniteCheckedException { super.start(); + // Have to start engine first because it registers schema listeners which has + // to be notified at the time the indexing module registers schema objects + if (qryEngine != null) { + ctx.resource().injectGeneric(qryEngine); + + qryEngine.start(ctx); + } + if (idx != null) { ctx.resource().injectGeneric(idx); @@ -689,6 +706,18 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @return Query engine. + * @throws IgniteException If module is not enabled. + */ + public QueryEngine getQueryEngine() throws IgniteException { + if (qryEngine == null) + throw new IgniteException("Failed to execute query using experimental engine (consider adding module " + + QUERY_ENGINE.module() + " to classpath or moving it from 'optional' to 'libs' folder)."); + + return qryEngine; + } + + /** * Create type descriptors from schema and initialize indexing for given cache.<p> * Use with {@link #busyLock} where appropriate. * @param cacheInfo Cache context info. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java index 6a40359..be0c62b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java @@ -83,6 +83,11 @@ public interface GridQueryTypeDescriptor { public GridQueryProperty property(String name); /** + * @return All properties. + */ + public Map<String, GridQueryProperty> properties(); + + /** * Gets indexes for this type. * * @return Indexes for this type. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java new file mode 100644 index 0000000..0a630e9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.ignite.internal.util.typedef.F; + +/** */ +public final class QueryContext { + /** */ + private static final Object[] EMPTY = {}; + /** */ + private final Object[] params; + + /** */ + private QueryContext(Object[] params) { + this.params = params; + } + + /** + * Finds an instance of an interface implemented by this object, + * or returns null if this object does not support that interface. + */ + public <C> C unwrap(Class<C> aClass) { + if (Object[].class == aClass) + return aClass.cast(params); + + return Arrays.stream(params).filter(aClass::isInstance).findFirst().map(aClass::cast).orElse(null); + } + + /** + * @param params Context parameters. + * @return Query context. + */ + public static QueryContext of(Object... params) { + return !F.isEmpty(params) ? new QueryContext(build(null, params).toArray()) : new QueryContext(EMPTY); + } + + /** */ + private static List<Object> build(List<Object> dst, Object[] src) { + if (dst == null) + dst = new ArrayList<>(); + + for (Object obj : src) { + if (obj == null) + continue; + + if (obj.getClass() == QueryContext.class) + build(dst, ((QueryContext)obj).params); + else + dst.add(obj); + } + + return dst; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java new file mode 100644 index 0000000..dd51c22 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Collections; +import java.util.List; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.internal.GridKernalContext; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface QueryEngine { + /** No op implementation. */ + class NoOpQueryEngine implements QueryEngine { + /** {@inheritDoc} + * @param ctx*/ + @Override public void start(GridKernalContext ctx) {} + + /** {@inheritDoc} */ + @Override public void stop() {} + + /** {@inheritDoc} */ + @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String query, Object... params) throws IgniteSQLException { + return Collections.emptyList(); + } + }; + + /** + * @param ctx Kernal context. + */ + void start(GridKernalContext ctx); + + /** */ + void stop(); + + /** + * @param ctx Query context, may be null. + * @param query Query. + * @param params Optional query parameters. + * @return Query cursor. + * @throws IgniteSQLException If failed. + */ + List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String query, Object... params) throws IgniteSQLException; +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java index d39ec37..6cd19d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java @@ -198,10 +198,8 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { return res; } - /** - * @return Properties. - */ - public Map<String, GridQueryProperty> properties() { + /** {@inheritDoc} */ + @Override public Map<String, GridQueryProperty> properties() { return props; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java new file mode 100644 index 0000000..1a2cdc8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; + +/** + * + */ +public interface SchemaChangeListener { + + void onSchemaCreate(String schemaName); + + void onSchemaDrop(String schemaName); + + void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo); + + void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java index 7f89ed1..fc3d099 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycle import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener; +import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener; import org.jetbrains.annotations.NotNull; import static java.util.Objects.requireNonNull; @@ -40,6 +41,9 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { private final List<MetastorageLifecycleListener> metastorageListeners = new ArrayList<>(); /** */ + private final List<SchemaChangeListener> schemaChangeListeners = new ArrayList<>(); + + /** */ private final List<DistributedMetastorageLifecycleListener> distributedMetastorageListeners = new ArrayList<>(); /** */ @@ -70,6 +74,18 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { } /** */ + public void registerSchemaChangeListener(@NotNull SchemaChangeListener schemaChangeListener) { + requireNonNull(schemaChangeListener, "Schema change event subscriber should be not-null."); + + schemaChangeListeners.add(schemaChangeListener); + } + + /** */ + public List<SchemaChangeListener> getSchemaChangeSubscribers() { + return schemaChangeListeners; + } + + /** */ public void registerDistributedMetastorageListener(@NotNull DistributedMetastorageLifecycleListener lsnr) { requireNonNull(lsnr, "Global metastorage subscriber should be not-null."); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java index af01bad..a94eaad 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewQuer import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewRunningQueries; import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewSchemas; import org.apache.ignite.internal.processors.query.h2.sys.view.SqlSystemViewTables; +import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.F; @@ -72,6 +73,9 @@ import org.jetbrains.annotations.Nullable; * Schema manager. Responsible for all manipulations on schema objects. */ public class SchemaManager { + /** */ + private final SchemaChangeListener lsnr; + /** Connection manager. */ private final ConnectionManager connMgr; @@ -98,14 +102,14 @@ public class SchemaManager { /** * Constructor. - * - * @param ctx Kernal context. + * @param ctx Kernal context. * @param connMgr Connection manager. */ public SchemaManager(GridKernalContext ctx, ConnectionManager connMgr) { this.ctx = ctx; this.connMgr = connMgr; + lsnr = schemaChangeListener(ctx); log = ctx.log(SchemaManager.class); } @@ -261,6 +265,7 @@ public class SchemaManager { conn = connMgr.connectionForThread().connection(schema.schemaName()); GridH2Table h2tbl = createTable(schema.schemaName(), schema, tblDesc, conn); + lsnr.onSqlTypeCreate(schemaName, type, cacheInfo); schema.add(tblDesc); @@ -297,6 +302,7 @@ public class SchemaManager { tbl.table().setRemoveIndexOnDestroy(rmvIdx); dropTable(tbl); + lsnr.onSqlTypeDrop(schemaName, tbl.type(), tbl.cacheInfo()); } catch (Exception e) { U.error(log, "Failed to drop table on cache stop (will ignore): " + tbl.fullTableName(), e); @@ -362,6 +368,7 @@ public class SchemaManager { */ private void createSchema0(String schema) throws IgniteCheckedException { connMgr.executeSystemStatement("CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema)); + lsnr.onSchemaCreate(schema); if (log.isDebugEnabled()) log.debug("Created H2 schema for index database: " + schema); @@ -526,6 +533,7 @@ public class SchemaManager { */ private void dropSchema(String schema) throws IgniteCheckedException { connMgr.executeSystemStatement("DROP SCHEMA IF EXISTS " + H2Utils.withQuotes(schema)); + lsnr.onSchemaDrop(schema); if (log.isDebugEnabled()) log.debug("Dropped H2 schema for index database: " + schema); @@ -766,4 +774,59 @@ public class SchemaManager { return null; } + + /** */ + private SchemaChangeListener schemaChangeListener(GridKernalContext ctx) { + List<SchemaChangeListener> subscribers = new ArrayList<>(ctx.internalSubscriptionProcessor().getSchemaChangeSubscribers()); + + if (F.isEmpty(subscribers)) + return new NoOpSchemaChangeListener(); + + return subscribers.size() == 1 ? subscribers.get(0) : new CompoundSchemaChangeListener(subscribers); + } + + /** */ + private static final class NoOpSchemaChangeListener implements SchemaChangeListener { + /** {@inheritDoc} */ + @Override public void onSchemaCreate(String schemaName) {} + + /** {@inheritDoc} */ + @Override public void onSchemaDrop(String schemaName) {} + + /** {@inheritDoc} */ + @Override public void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) {} + + /** {@inheritDoc} */ + @Override public void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) {} + } + + /** */ + private static final class CompoundSchemaChangeListener implements SchemaChangeListener { + /** */ + private final List<SchemaChangeListener> lsnrs; + + private CompoundSchemaChangeListener(List<SchemaChangeListener> lsnrs) { + this.lsnrs = lsnrs; + } + + /** {@inheritDoc} */ + @Override public void onSchemaCreate(String schemaName) { + lsnrs.forEach(lsnr -> lsnr.onSchemaCreate(schemaName)); + } + + /** {@inheritDoc} */ + @Override public void onSchemaDrop(String schemaName) { + lsnrs.forEach(lsnr -> lsnr.onSchemaCreate(schemaName)); + } + + /** {@inheritDoc} */ + @Override public void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { + lsnrs.forEach(lsnr -> lsnr.onSqlTypeCreate(schemaName, typeDescriptor, cacheInfo)); + } + + /** {@inheritDoc} */ + @Override public void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { + lsnrs.forEach(lsnr -> lsnr.onSqlTypeDrop(schemaName, typeDescriptor, cacheInfo)); + } + } } diff --git a/parent/pom.xml b/parent/pom.xml index 2d5dce7..3605ecd 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -139,6 +139,7 @@ <zookeeper.version>3.5.5</zookeeper.version> <zstd.version>1.3.7-2</zstd.version> <opencensus.version>0.22.0</opencensus.version> + <calcite.version>1.20.0</calcite.version> <!-- Maven plugins versions --> diff --git a/pom.xml b/pom.xml index dbeeda8..019e6c9 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ <module>modules/ml/tensorflow-model-parser</module> <module>modules/tensorflow</module> <module>modules/opencensus</module> + <module>modules/calcite</module> </modules> <profiles>