This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a80f057 refactor federation executor scan table logic (#13988)
a80f057 is described below
commit a80f057f2eafcdaa34b6fb7ed15f4ede0bb470c4
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Dec 9 09:39:00 2021 +0800
refactor federation executor scan table logic (#13988)
* refactor federation executor scan table logic
* remove useless class
* adjust adaptor execute logic
* optimize logic
* resolve code conflict
* fix checkstyle
* optimize logic
* optimize logic
* optimize logic
* optimize java doc
* remove useless method
* fix integration test
* fix integration test
* setParameters for jdbc statement
* fix integration test
* fix integration test
* fix integration test
* optimize binding table judge logic
* optimize rel to sql converter init logic
* optimize binding table judge logic when join condition in where segment
---
.../shardingsphere/sharding/rule/ShardingRule.java | 18 ++-
.../pom.xml | 10 ++
.../federation/executor/FederationExecutor.java | 12 +-
.../customized/CustomizedFilterableExecutor.java | 11 +-
.../original/OriginalFilterableExecutor.java | 32 ++--
.../original/row/FilterableRowEnumerator.java | 45 ++----
.../sql/FilterableExecutionContextGenerator.java | 76 ----------
.../original/sql/FilterableSQLGenerator.java | 63 --------
.../executor/original/sql/RouteContextFilter.java | 57 -------
.../executor/original/table/FilterableTable.java | 14 +-
.../table/FilterableTableScanExecutor.java | 163 +++++++++++++++++++--
.../optimizer/context/OptimizerContext.java | 8 +-
.../optimizer/context/OptimizerContextFactory.java | 8 +-
.../planner/OptimizerPlannerContextFactory.java | 3 +-
.../optimizer/ShardingSphereOptimizerTest.java | 15 +-
.../driver/executor/DriverJDBCExecutor.java | 2 +-
.../statement/ShardingSpherePreparedStatement.java | 43 +++---
.../core/statement/ShardingSphereStatement.java | 47 +++---
.../mode/manager/ContextManager.java | 18 +--
.../mode/metadata/MetaDataContexts.java | 8 +-
.../mode/metadata/MetaDataContextsBuilder.java | 12 +-
.../communication/DatabaseCommunicationEngine.java | 63 +++++++-
.../backend/communication/ProxySQLExecutor.java | 36 +----
.../select_join_encrypt.xml | 12 +-
.../cases/dql/dql-integration-test-cases.xml | 2 +-
25 files changed, 364 insertions(+), 414 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
index e6bf2a7..9aefc06 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
@@ -55,6 +55,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.util.WhereExtractUtil;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -288,7 +289,13 @@ public final class ShardingRule implements SchemaRule,
DataNodeContainedRule, Ta
if (!(sqlStatementContext instanceof SelectStatementContext &&
((SelectStatementContext) sqlStatementContext).isContainsJoinQuery())) {
return isAllBindingTables(logicTableNames);
}
- return isAllBindingTables(logicTableNames) &&
isJoinConditionContainsShardingColumns(schema, (SelectStatementContext)
sqlStatementContext, logicTableNames);
+ if (!isAllBindingTables(logicTableNames)) {
+ return false;
+ }
+ SelectStatementContext select = (SelectStatementContext)
sqlStatementContext;
+ Collection<WhereSegment> joinSegments =
WhereExtractUtil.getJoinWhereSegments(select.getSqlStatement());
+ Collection<WhereSegment> whereSegments = select.getWhere().isPresent()
? Collections.singletonList(select.getWhere().get()) : Collections.emptyList();
+ return isJoinConditionContainsShardingColumns(schema, select,
logicTableNames, joinSegments) ||
isJoinConditionContainsShardingColumns(schema, select, logicTableNames,
whereSegments);
}
private Optional<BindingTableRule> findBindingTableRule(final
Collection<String> logicTableNames) {
@@ -554,10 +561,11 @@ public final class ShardingRule implements SchemaRule,
DataNodeContainedRule, Ta
return ShardingRule.class.getSimpleName();
}
- private boolean isJoinConditionContainsShardingColumns(final
ShardingSphereSchema schema, final SelectStatementContext select, final
Collection<String> tableNames) {
+ private boolean isJoinConditionContainsShardingColumns(final
ShardingSphereSchema schema, final SelectStatementContext select,
+ final
Collection<String> tableNames, final Collection<WhereSegment> whereSegments) {
Collection<String> databaseJoinConditionTables = new
HashSet<>(tableNames.size());
Collection<String> tableJoinConditionTables = new
HashSet<>(tableNames.size());
- for (WhereSegment each :
WhereExtractUtil.getJoinWhereSegments(select.getSqlStatement())) {
+ for (WhereSegment each : whereSegments) {
Collection<AndPredicate> andPredicates =
ExpressionExtractUtil.getAndPredicates(each.getExpr());
if (andPredicates.size() > 1) {
return false;
@@ -568,9 +576,9 @@ public final class ShardingRule implements SchemaRule,
DataNodeContainedRule, Ta
}
}
TableRule tableRule = getTableRule(tableNames.iterator().next());
- boolean containsDatabaseShardingColumns =
!(tableRule.getDatabaseShardingStrategyConfig() instanceof
StandardShardingStrategyConfiguration)
+ boolean containsDatabaseShardingColumns =
!(getDatabaseShardingStrategyConfiguration(tableRule) instanceof
StandardShardingStrategyConfiguration)
|| databaseJoinConditionTables.containsAll(tableNames);
- boolean containsTableShardingColumns =
!(tableRule.getTableShardingStrategyConfig() instanceof
StandardShardingStrategyConfiguration) ||
tableJoinConditionTables.containsAll(tableNames);
+ boolean containsTableShardingColumns =
!(getTableShardingStrategyConfiguration(tableRule) instanceof
StandardShardingStrategyConfiguration) ||
tableJoinConditionTables.containsAll(tableNames);
return containsDatabaseShardingColumns && containsTableShardingColumns;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
index 707751a..7e79a9b 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
@@ -38,5 +38,15 @@
<artifactId>shardingsphere-infra-executor</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-infra-context</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-infra-merge</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
index 53df685..b3f74c1 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
@@ -17,17 +17,15 @@
package org.apache.shardingsphere.infra.federation.executor;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.List;
/**
* Federation executor.
@@ -39,12 +37,12 @@ public interface FederationExecutor extends AutoCloseable {
*
* @param prepareEngine prepare engine
* @param callback callback
- * @param executionContext execution context
- * @return query results
+ * @param logicSQL logic SQL
+ * @return result set
* @throws SQLException SQL exception
*/
- List<QueryResult>
executeQuery(DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine,
- JDBCExecutorCallback<? extends
ExecuteResult> callback, ExecutionContext executionContext) throws SQLException;
+ ResultSet executeQuery(DriverExecutionPrepareEngine<JDBCExecutionUnit,
Connection> prepareEngine,
+ JDBCExecutorCallback<? extends ExecuteResult>
callback, LogicSQL logicSQL) throws SQLException;
/**
* Get result set.
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
index eb7bec2..558491b 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
@@ -23,11 +23,10 @@ import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import
org.apache.shardingsphere.infra.federation.optimizer.ShardingSphereOptimizer;
@@ -37,8 +36,6 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
/**
* Customized filterable executor.
@@ -55,10 +52,10 @@ public final class CustomizedFilterableExecutor implements
FederationExecutor {
}
@Override
- public List<QueryResult> executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends
ExecuteResult> callback, final ExecutionContext executionContext) throws
SQLException {
+ public ResultSet executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final JDBCExecutorCallback<? extends
ExecuteResult> callback, final LogicSQL logicSQL) throws SQLException {
// TODO
- return Collections.emptyList();
+ return null;
}
@Override
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
index 01a1a57..70e8506 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
@@ -19,15 +19,12 @@ package
org.apache.shardingsphere.infra.federation.executor.original;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import
org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanExecutor;
@@ -40,7 +37,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collections;
import java.util.List;
/**
@@ -72,33 +68,25 @@ public final class OriginalFilterableExecutor implements
FederationExecutor {
}
@Override
- public List<QueryResult> executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends
ExecuteResult> callback, final ExecutionContext executionContext) throws
SQLException {
- ResultSet resultSet = execute(prepareEngine, callback,
executionContext);
- return Collections.singletonList(new JDBCStreamQueryResult(resultSet));
- }
-
- private ResultSet execute(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends
ExecuteResult> callback, final ExecutionContext executionContext) throws
SQLException {
- SQLUnit sqlUnit =
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
- PreparedStatement preparedStatement = createConnection(prepareEngine,
callback,
executionContext).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
- setParameters(preparedStatement, sqlUnit.getParameters());
+ public ResultSet executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final JDBCExecutorCallback<? extends
ExecuteResult> callback, final LogicSQL logicSQL) throws SQLException {
+ PreparedStatement preparedStatement = createConnection(prepareEngine,
callback,
logicSQL.getParameters()).prepareStatement(SQLUtil.trimSemicolon(logicSQL.getSql()));
+ setParameters(preparedStatement, logicSQL.getParameters());
this.statement = preparedStatement;
return preparedStatement.executeQuery();
}
private Connection createConnection(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends
ExecuteResult> callback, final ExecutionContext executionContext) throws
SQLException {
+ final JDBCExecutorCallback<? extends
ExecuteResult> callback, final List<Object> parameters) throws SQLException {
Connection result = DriverManager.getConnection(CONNECTION_URL,
optimizerContext.getParserContexts().get(schemaName).getDialectProps());
- addSchema(result.unwrap(CalciteConnection.class), prepareEngine,
callback, executionContext);
+ addSchema(result.unwrap(CalciteConnection.class), prepareEngine,
callback, parameters);
return result;
}
private void addSchema(final CalciteConnection connection, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends ExecuteResult>
callback, final ExecutionContext executionContext) throws SQLException {
- FilterableTableScanExecutor executor = new FilterableTableScanExecutor(
- prepareEngine, jdbcExecutor, callback, props,
executionContext,
optimizerContext.getParserContexts().get(schemaName).getDatabaseType().getQuoteCharacter());
- FilterableSchema schema = new
FilterableSchema(optimizerContext.getMetaData().getSchemas().get(schemaName),
executor);
+ final JDBCExecutorCallback<? extends ExecuteResult>
callback, final List<Object> parameters) throws SQLException {
+ FilterableTableScanExecutor executor = new
FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, props,
optimizerContext, schemaName, parameters);
+ FilterableSchema schema = new
FilterableSchema(optimizerContext.getFederationMetaData().getSchemas().get(schemaName),
executor);
connection.getRootSchema().add(schemaName, schema);
connection.setSchema(schemaName);
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
index 3b55e96..83ee68f 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
@@ -18,31 +18,25 @@
package org.apache.shardingsphere.infra.federation.executor.original.row;
import org.apache.calcite.linq4j.Enumerator;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
/**
* Filterable row enumerator.
*/
public final class FilterableRowEnumerator implements Enumerator<Object[]> {
- private final Collection<QueryResult> queryResults = new LinkedList<>();
+ private final MergedResult result;
- private final Iterator<QueryResult> iterator;
-
- private QueryResult currentResultSet;
+ private final QueryResultMetaData metaData;
private Object[] currentRow;
- public FilterableRowEnumerator(final Collection<QueryResult> queryResults)
{
- this.queryResults.addAll(queryResults);
- iterator = this.queryResults.iterator();
- currentResultSet = iterator.next();
+ public FilterableRowEnumerator(final MergedResult queryResult, final
QueryResultMetaData metaData) {
+ this.result = queryResult;
+ this.metaData = metaData;
}
@Override
@@ -60,23 +54,17 @@ public final class FilterableRowEnumerator implements
Enumerator<Object[]> {
}
private boolean moveNext0() throws SQLException {
- if (currentResultSet.next()) {
+ if (result.next()) {
setCurrentRow();
return true;
}
- if (!iterator.hasNext()) {
- currentRow = null;
- return false;
- }
- currentResultSet = iterator.next();
- return moveNext0();
+ return false;
}
private void setCurrentRow() throws SQLException {
- int columnCount = currentResultSet.getMetaData().getColumnCount();
- currentRow = new Object[columnCount];
- for (int i = 0; i < columnCount; i++) {
- currentRow[i] = currentResultSet.getValue(i + 1, Object.class);
+ currentRow = new Object[metaData.getColumnCount()];
+ for (int i = 0; i < metaData.getColumnCount(); i++) {
+ currentRow[i] = result.getValue(i + 1, Object.class);
}
}
@@ -86,13 +74,6 @@ public final class FilterableRowEnumerator implements
Enumerator<Object[]> {
@Override
public void close() {
- try {
- for (QueryResult each : queryResults) {
- each.close();
- }
- currentRow = null;
- } catch (final SQLException ex) {
- throw new ShardingSphereException(ex);
- }
+ currentRow = null;
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableExecutionContextGenerator.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableExecutionContextGenerator.java
deleted file mode 100644
index 758e4b2..0000000
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableExecutionContextGenerator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.federation.executor.original.sql;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
-import
org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanContext;
-import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import org.apache.shardingsphere.infra.route.context.RouteContext;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
-import org.apache.shardingsphere.sql.parser.sql.common.constant.QuoteCharacter;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.stream.Collectors;
-
-/**
- * Filterable execution context generator.
- */
-@RequiredArgsConstructor
-public final class FilterableExecutionContextGenerator {
-
- private final ExecutionContext routeExecutionContext;
-
- private final QuoteCharacter quoteCharacter;
-
- /**
- * Generate execution context.
- *
- * @param tableMetaData table meta data
- * @param scanContext filterable table scan context
- * @return generated execution context
- */
- public ExecutionContext generate(final FederationTableMetaData
tableMetaData, final FilterableTableScanContext scanContext) {
- RouteContext filteredRouteContext = new
RouteContextFilter().filter(tableMetaData.getName(),
routeExecutionContext.getRouteContext());
- return new ExecutionContext(routeExecutionContext.getLogicSQL(),
generate(filteredRouteContext.getRouteUnits(), tableMetaData, scanContext,
quoteCharacter), filteredRouteContext);
- }
-
- private Collection<ExecutionUnit> generate(final Collection<RouteUnit>
routeUnits,
- final FederationTableMetaData
tableMetaData, final FilterableTableScanContext scanContext, final
QuoteCharacter quoteCharacter) {
- Collection<ExecutionUnit> result = new LinkedHashSet<>();
- FilterableSQLGenerator sqlGenerator = new
FilterableSQLGenerator(tableMetaData, scanContext, quoteCharacter);
- for (RouteUnit each: routeUnits) {
- result.addAll(generate(each, sqlGenerator));
- }
- return result;
- }
-
- private Collection<ExecutionUnit> generate(final RouteUnit routeUnit,
final FilterableSQLGenerator sqlGenerator) {
- return routeUnit.getTableMappers().stream().map(each ->
generate(routeUnit, each, sqlGenerator)).collect(Collectors.toList());
- }
-
- private ExecutionUnit generate(final RouteUnit routeUnit, final
RouteMapper tableMapper, final FilterableSQLGenerator sqlGenerator) {
- String sql = sqlGenerator.generate(tableMapper.getActualName());
- return new
ExecutionUnit(routeUnit.getDataSourceMapper().getActualName(), new SQLUnit(sql,
Collections.emptyList(), Collections.singletonList(tableMapper)));
- }
-}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableSQLGenerator.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableSQLGenerator.java
deleted file mode 100644
index 0a4a00e..0000000
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableSQLGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.federation.executor.original.sql;
-
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanContext;
-import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import org.apache.shardingsphere.sql.parser.sql.common.constant.QuoteCharacter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.stream.Collectors;
-
-/**
- * Filterable SQL generator.
- */
-@RequiredArgsConstructor
-public final class FilterableSQLGenerator {
-
- private final FederationTableMetaData tableMetaData;
-
- private final FilterableTableScanContext scanContext;
-
- private final QuoteCharacter quoteCharacter;
-
- /**
- * Generate federation SQL.
- *
- * @param actualTableName actual table name
- * @return generated federation SQL
- */
- public String generate(final String actualTableName) {
- String projections = getQuotedProjections(tableMetaData, scanContext,
quoteCharacter);
- String table = getQuotedTable(actualTableName, quoteCharacter);
- // TODO generate SQL with filters
- return String.format("SELECT %s FROM %s", projections, table);
- }
-
- private String getQuotedProjections(final FederationTableMetaData
tableMetaData, final FilterableTableScanContext scanContext, final
QuoteCharacter quoteCharacter) {
- Collection<String> actualColumnNames = null ==
scanContext.getProjects()
- ? tableMetaData.getColumnNames() :
Arrays.stream(scanContext.getProjects()).mapToObj(tableMetaData.getColumnNames()::get).collect(Collectors.toList());
- return
actualColumnNames.stream().map(quoteCharacter::wrap).collect(Collectors.joining(",
"));
- }
-
- private String getQuotedTable(final String actualTableName, final
QuoteCharacter quoteCharacter) {
- return quoteCharacter.wrap(actualTableName);
- }
-}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/RouteContextFilter.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/RouteContextFilter.java
deleted file mode 100644
index 580e214..0000000
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/RouteContextFilter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.federation.executor.original.sql;
-
-import org.apache.shardingsphere.infra.route.context.RouteContext;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Route context filter.
- */
-public final class RouteContextFilter {
-
- /**
- * Filter route context.
- *
- * @param toBeKeptTableName to be kept table name
- * @param routeContext route context
- * @return filtered route context
- */
- public RouteContext filter(final String toBeKeptTableName, final
RouteContext routeContext) {
- RouteContext result = new RouteContext();
- result.getRouteUnits().addAll(filterRouteUnits(toBeKeptTableName,
routeContext));
- return result;
- }
-
- private Collection<RouteUnit> filterRouteUnits(final String
toBeKeptTableName, final RouteContext routeContext) {
- return routeContext.getRouteUnits().stream().map(each ->
filterRouteUnit(toBeKeptTableName, each)).filter(each ->
!each.getTableMappers().isEmpty()).collect(Collectors.toList());
- }
-
- private RouteUnit filterRouteUnit(final String toBeKeptTableName, final
RouteUnit routeUnit) {
- return new RouteUnit(routeUnit.getDataSourceMapper(),
filterTableMappers(toBeKeptTableName, routeUnit));
- }
-
- private List<RouteMapper> filterTableMappers(final String
toBeKeptTableName, final RouteUnit routeUnit) {
- return routeUnit.getTableMappers().stream().filter(each ->
each.getLogicName().equalsIgnoreCase(toBeKeptTableName)).collect(Collectors.toList());
- }
-}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
index 886cafa..a1e165c 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
@@ -19,21 +19,16 @@ package
org.apache.shardingsphere.infra.federation.executor.original.table;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.impl.AbstractTable;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.federation.executor.original.FederationTableStatistic;
-import
org.apache.shardingsphere.infra.federation.executor.original.row.FilterableRowEnumerator;
import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import java.util.Collection;
import java.util.List;
/**
@@ -55,14 +50,7 @@ public final class FilterableTable extends AbstractTable
implements ProjectableF
@Override
public Enumerable<Object[]> scan(final DataContext root, final
List<RexNode> filters, final int[] projects) {
- Collection<QueryResult> queryResults = executor.execute(metaData, new
FilterableTableScanContext(root, filters, projects));
- return new AbstractEnumerable<Object[]>() {
-
- @Override
- public Enumerator<Object[]> enumerator() {
- return new FilterableRowEnumerator(queryResults);
- }
- };
+ return executor.execute(metaData, new FilterableTableScanContext(root,
filters, projects));
}
@Override
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
index f988190..8c5ffc6 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
@@ -17,8 +17,40 @@
package org.apache.shardingsphere.infra.federation.executor.original.table;
+import com.google.common.collect.ImmutableList;
+import lombok.SneakyThrows;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.dialect.MssqlSqlDialect;
+import org.apache.calcite.sql.dialect.MysqlSqlDialect;
+import org.apache.calcite.sql.dialect.OracleSqlDialect;
+import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
+import org.apache.calcite.sql.util.SqlString;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.MariaDBDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.OracleDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.SQLServerDatabaseType;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -26,15 +58,28 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
-import
org.apache.shardingsphere.infra.federation.executor.original.sql.FilterableExecutionContextGenerator;
+import
org.apache.shardingsphere.infra.federation.executor.original.row.FilterableRowEnumerator;
+import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import org.apache.shardingsphere.sql.parser.sql.common.constant.QuoteCharacter;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -42,6 +87,18 @@ import java.util.stream.Collectors;
*/
public final class FilterableTableScanExecutor {
+ private static final Map<Class<? extends DatabaseType>, SqlDialect>
SQL_DIALECTS = new HashMap<>();
+
+ static {
+ SQL_DIALECTS.put(H2DatabaseType.class, MysqlSqlDialect.DEFAULT);
+ SQL_DIALECTS.put(MySQLDatabaseType.class, MysqlSqlDialect.DEFAULT);
+ SQL_DIALECTS.put(MariaDBDatabaseType.class, MysqlSqlDialect.DEFAULT);
+ SQL_DIALECTS.put(OracleDatabaseType.class, OracleSqlDialect.DEFAULT);
+ SQL_DIALECTS.put(SQLServerDatabaseType.class, MssqlSqlDialect.DEFAULT);
+ SQL_DIALECTS.put(PostgreSQLDatabaseType.class,
PostgresqlSqlDialect.DEFAULT);
+ SQL_DIALECTS.put(OpenGaussDatabaseType.class,
PostgresqlSqlDialect.DEFAULT);
+ }
+
private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine;
private final JDBCExecutor jdbcExecutor;
@@ -50,16 +107,22 @@ public final class FilterableTableScanExecutor {
private final ConfigurationProperties props;
- private final FilterableExecutionContextGenerator
executionContextGenerator;
+ private final OptimizerContext optimizerContext;
+
+ private final String schemaName;
- public FilterableTableScanExecutor(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutor jdbcExecutor, final
JDBCExecutorCallback<? extends ExecuteResult> callback,
- final ConfigurationProperties props,
final ExecutionContext routeExecutionContext, final QuoteCharacter
quoteCharacter) {
+ private final List<Object> parameters;
+
+ public FilterableTableScanExecutor(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final JDBCExecutor jdbcExecutor,
+ final JDBCExecutorCallback<? extends
ExecuteResult> callback, final ConfigurationProperties props,
+ final OptimizerContext
optimizerContext, final String schemaName, final List<Object> parameters) {
this.jdbcExecutor = jdbcExecutor;
this.callback = callback;
this.prepareEngine = prepareEngine;
this.props = props;
- executionContextGenerator = new
FilterableExecutionContextGenerator(routeExecutionContext, quoteCharacter);
+ this.optimizerContext = optimizerContext;
+ this.schemaName = schemaName;
+ this.parameters = parameters;
}
/**
@@ -69,18 +132,96 @@ public final class FilterableTableScanExecutor {
* @param scanContext filterable table scan context
* @return query results
*/
- public Collection<QueryResult> execute(final FederationTableMetaData
tableMetaData, final FilterableTableScanContext scanContext) {
- ExecutionContext context =
executionContextGenerator.generate(tableMetaData, scanContext);
+ public Enumerable<Object[]> execute(final FederationTableMetaData
tableMetaData, final FilterableTableScanContext scanContext) {
+ DatabaseType databaseType =
DatabaseTypeRegistry.getTrunkDatabaseType(optimizerContext.getParserContexts().get(schemaName).getDatabaseType().getName());
+ SqlString sqlString = createSQLString(tableMetaData, scanContext,
databaseType);
+ // TODO replace sql parse with sql convert
+ SQLStatement sqlStatement = new
SQLStatementParserEngine(databaseType.getName(),
optimizerContext.getSqlParserRule()).parse(sqlString.getSql(), false);
+ LogicSQL logicSQL = createLogicSQL(optimizerContext.getMetaDataMap(),
sqlString.getSql(), getParameters(sqlString.getDynamicParameters()),
sqlStatement);
+ ShardingSphereMetaData metaData =
optimizerContext.getMetaDataMap().get(schemaName);
+ ExecutionContext context = new
KernelProcessor().generateExecutionContext(logicSQL, metaData, props);
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
+ setParameters(executionGroupContext.getInputGroups(),
logicSQL.getParameters());
ExecuteProcessEngine.initialize(context.getLogicSQL(),
executionGroupContext, props);
- Collection<QueryResult> result =
jdbcExecutor.execute(executionGroupContext, callback).stream().map(each ->
(QueryResult) each).collect(Collectors.toList());
+ List<QueryResult> result =
jdbcExecutor.execute(executionGroupContext, callback).stream().map(each ->
(QueryResult) each).collect(Collectors.toList());
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
- return result;
+ MergeEngine mergeEngine = new MergeEngine(schemaName,
databaseType, metaData.getSchema(), props,
metaData.getRuleMetaData().getRules());
+ MergedResult mergedResult = mergeEngine.merge(result,
logicSQL.getSqlStatementContext());
+ return createEnumerable(mergedResult, result.get(0).getMetaData());
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
} finally {
ExecuteProcessEngine.clean();
}
}
+
+ private SqlString createSQLString(final FederationTableMetaData
tableMetaData, final FilterableTableScanContext scanContext, final DatabaseType
databaseType) {
+ SqlDialect sqlDialect =
SQL_DIALECTS.getOrDefault(databaseType.getClass(), MysqlSqlDialect.DEFAULT);
+ return new
RelToSqlConverter(sqlDialect).visitRoot(createRelNode(tableMetaData,
scanContext)).asStatement().toSqlString(sqlDialect);
+ }
+
+ @SneakyThrows
+ private void setParameters(final
Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups, final List<Object>
parameters) {
+ for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
+ for (JDBCExecutionUnit executionUnit : each.getInputs()) {
+ if (!(executionUnit.getStorageResource() instanceof
PreparedStatement)) {
+ continue;
+ }
+ setParameters((PreparedStatement)
executionUnit.getStorageResource(), parameters);
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void setParameters(final PreparedStatement preparedStatement,
final List<Object> parameters) {
+ for (int i = 0; i < parameters.size(); i++) {
+ Object parameter = parameters.get(i);
+ preparedStatement.setObject(i + 1, parameter);
+ }
+ }
+
+ private List<Object> getParameters(final ImmutableList<Integer>
parameterIndices) {
+ if (null == parameterIndices) {
+ return Collections.emptyList();
+ }
+ List<Object> result = new ArrayList<>();
+ for (Integer each : parameterIndices) {
+ result.add(parameters.get(each));
+ }
+ return result;
+ }
+
+ private RelNode createRelNode(final FederationTableMetaData tableMetaData,
final FilterableTableScanContext scanContext) {
+ RelOptCluster relOptCluster =
optimizerContext.getPlannerContexts().get(schemaName).getConverter().getCluster();
+ RelOptSchema relOptSchema = (RelOptSchema)
optimizerContext.getPlannerContexts().get(schemaName).getValidator().getCatalogReader();
+ RelBuilder builder =
RelFactories.LOGICAL_BUILDER.create(relOptCluster,
relOptSchema).scan(tableMetaData.getName()).filter(scanContext.getFilters());
+ if (null != scanContext.getProjects()) {
+ builder.project(createProjections(scanContext.getProjects(),
builder, tableMetaData.getColumnNames()));
+ }
+ return builder.build();
+ }
+
+ private Collection<RexNode> createProjections(final int[] projects, final
RelBuilder relBuilder, final List<String> columnNames) {
+ Collection<RexNode> result = new LinkedList<>();
+ for (int each : projects) {
+ result.add(relBuilder.field(columnNames.get(each)));
+ }
+ return result;
+ }
+
+ private AbstractEnumerable<Object[]> createEnumerable(final MergedResult
mergedResult, final QueryResultMetaData metaData) {
+ return new AbstractEnumerable<Object[]>() {
+
+ @Override
+ public Enumerator<Object[]> enumerator() {
+ return new FilterableRowEnumerator(mergedResult, metaData);
+ }
+ };
+ }
+
+ private LogicSQL createLogicSQL(final Map<String, ShardingSphereMetaData>
metaDataMap, final String sql, final List<Object> parameters, final
SQLStatement sqlStatement) {
+ SQLStatementContext<?> sqlStatementContext =
SQLStatementContextFactory.newInstance(metaDataMap, parameters, sqlStatement,
sql);
+ return new LogicSQL(sqlStatementContext, sql, parameters);
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
index d82263b..d892f42 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.infra.federation.optimizer.context.parser.OptimizerParserContext;
import
org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContext;
import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.util.Map;
@@ -32,7 +34,11 @@ import java.util.Map;
@Getter
public final class OptimizerContext {
- private final FederationMetaData metaData;
+ private final SQLParserRule sqlParserRule;
+
+ private final FederationMetaData federationMetaData;
+
+ private final Map<String, ShardingSphereMetaData> metaDataMap;
private final Map<String, OptimizerParserContext> parserContexts;
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
index 170df49..c8b15a1 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
@@ -25,6 +25,8 @@ import
org.apache.shardingsphere.infra.federation.optimizer.context.planner.Opti
import
org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.util.Map;
@@ -38,12 +40,14 @@ public final class OptimizerContextFactory {
* Create optimize context.
*
* @param metaDataMap meta data map
+ * @param globalRuleMetaData global rule meta data
* @return created optimizer context
*/
- public static OptimizerContext create(final Map<String,
ShardingSphereMetaData> metaDataMap) {
+ public static OptimizerContext create(final Map<String,
ShardingSphereMetaData> metaDataMap, final ShardingSphereRuleMetaData
globalRuleMetaData) {
FederationMetaData federationMetaData = new
FederationMetaData(metaDataMap);
Map<String, OptimizerParserContext> parserContexts =
OptimizerParserContextFactory.create(metaDataMap);
Map<String, OptimizerPlannerContext> plannerContexts =
OptimizerPlannerContextFactory.create(federationMetaData);
- return new OptimizerContext(federationMetaData, parserContexts,
plannerContexts);
+ SQLParserRule sqlParserRule =
globalRuleMetaData.findSingleRule(SQLParserRule.class).orElse(null);
+ return new OptimizerContext(sqlParserRule, federationMetaData,
metaDataMap, parserContexts, plannerContexts);
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
index 072e2b4..fda7b2a 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
@@ -68,7 +68,8 @@ public final class OptimizerPlannerContextFactory {
RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
CalciteCatalogReader catalogReader =
createCatalogReader(schemaName, federationSchema, relDataTypeFactory,
connectionConfig);
SqlValidator validator = createValidator(catalogReader,
relDataTypeFactory, connectionConfig);
- result.put(schemaName, new OptimizerPlannerContext(validator,
createConverter(catalogReader, validator, relDataTypeFactory)));
+ SqlToRelConverter converter = createConverter(catalogReader,
validator, relDataTypeFactory);
+ result.put(schemaName, new OptimizerPlannerContext(validator,
converter));
}
return result;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
index d7fd09d..ba21e39 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
@@ -22,20 +22,26 @@ import
org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
+import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.model.ColumnMetaData;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
+import org.apache.shardingsphere.sql.parser.api.CacheOption;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.junit.Before;
import org.junit.Test;
import java.sql.Types;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
@@ -100,7 +106,14 @@ public final class ShardingSphereOptimizerTest {
tableMetaDataMap.put("t_order_federate", createOrderTableMetaData());
tableMetaDataMap.put("t_user_info", createUserInfoTableMetaData());
ShardingSphereMetaData metaData = new
ShardingSphereMetaData(schemaName, mockResource(), null, new
ShardingSphereSchema(tableMetaDataMap));
- optimizer = new
ShardingSphereOptimizer(OptimizerContextFactory.create(Collections.singletonMap(schemaName,
metaData)));
+ optimizer = new
ShardingSphereOptimizer(OptimizerContextFactory.create(Collections.singletonMap(schemaName,
metaData), createGlobalRuleMetaData()));
+ }
+
+ private ShardingSphereRuleMetaData createGlobalRuleMetaData() {
+ Collection<ShardingSphereRule> rules = new LinkedList<>();
+ CacheOption cacheOption = new CacheOption(128, 1024L, 4);
+ rules.add(new SQLParserRule(new SQLParserRuleConfiguration(false,
cacheOption, cacheOption)));
+ return new ShardingSphereRuleMetaData(Collections.emptyList(), rules);
}
private ShardingSphereResource mockResource() {
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index f2c25ec..b04833b 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -58,7 +58,7 @@ public final class DriverJDBCExecutor {
this.metaDataContexts = metaDataContexts;
this.jdbcExecutor = jdbcExecutor;
metadataRefreshEngine = new
MetaDataRefreshEngine(metaDataContexts.getMetaData(schemaName),
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().get(schemaName),
metaDataContexts.getProps());
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName),
metaDataContexts.getProps());
}
/**
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index bf36cb7..6f7d295 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -82,7 +82,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -180,7 +179,12 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return statements.iterator().next().executeQuery();
}
clearPrevious();
- executionContext = createExecutionContext();
+ LogicSQL logicSQL = createLogicSQL();
+ // TODO move federation route logic to binder
+ executionContext = createExecutionContext(logicSQL);
+ if (executionContext.getRouteContext().isFederated()) {
+ return executeFederationQuery(logicSQL);
+ }
List<QueryResult> queryResults = executeQuery0();
MergedResult mergedResult = mergeQuery(queryResults);
return new ShardingSphereResultSet(getShardingSphereResultSet(),
mergedResult, this, executionContext);
@@ -195,10 +199,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
replaySetParameter();
}
- private List<ResultSet> getShardingSphereResultSet() throws SQLException {
- if (executionContext.getRouteContext().isFederated()) {
- return
Collections.singletonList(executor.getFederationExecutor().getResultSet());
- }
+ private List<ResultSet> getShardingSphereResultSet() {
List<ResultSet> result = new ArrayList<>(statements.size());
for (PreparedStatement each : statements) {
ResultSet resultSet = getResultSet(each);
@@ -212,9 +213,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(),
new RawSQLExecutorCallback()).stream().map(each ->
(QueryResult) each).collect(Collectors.toList());
}
- if (executionContext.getRouteContext().isFederated()) {
- return executeFederatedQuery();
- }
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getLogicSQL(),
@@ -222,13 +220,10 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
SQLExecutorExceptionHandler.isExceptionThrown()));
}
- private List<QueryResult> executeFederatedQuery() throws SQLException {
- if (executionContext.getExecutionUnits().isEmpty()) {
- return Collections.emptyList();
- }
+ private ResultSet executeFederationQuery(final LogicSQL logicSQL) throws
SQLException {
PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchema()).getResource().getDatabaseType(),
sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown());
- return
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
callback, executionContext);
+ return
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
callback, logicSQL);
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
@@ -245,7 +240,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return statements.iterator().next().executeUpdate();
}
clearPrevious();
- executionContext = createExecutionContext();
+ executionContext = createExecutionContext(createLogicSQL());
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
return accumulate(executeResults);
@@ -291,15 +286,16 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
return statements.iterator().next().execute();
}
clearPrevious();
- executionContext = createExecutionContext();
+ LogicSQL logicSQL = createLogicSQL();
+ executionContext = createExecutionContext(logicSQL);
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
// TODO process getStatement
Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
return executeResults.iterator().next() instanceof QueryResult;
}
if (executionContext.getRouteContext().isFederated()) {
- List<QueryResult> queryResults = executeFederatedQuery();
- return !queryResults.isEmpty();
+ ResultSet resultSet = executeFederationQuery(logicSQL);
+ return null != resultSet;
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -342,6 +338,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
if (null != currentResultSet) {
return currentResultSet;
}
+ if (executionContext.getRouteContext().isFederated()) {
+ return executor.getFederationExecutor().getResultSet();
+ }
if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext ||
executionContext.getSqlStatementContext().getSqlStatement() instanceof
DALStatement) {
List<ResultSet> resultSets = getResultSets();
MergedResult mergedResult =
mergeQuery(getQueryResults(resultSets));
@@ -363,9 +362,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
for (Statement each : statements) {
result.add(each.getResultSet());
}
- if (executionContext.getRouteContext().isFederated()) {
- result.add(executor.getFederationExecutor().getResultSet());
- }
return result;
}
@@ -379,8 +375,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return result;
}
- private ExecutionContext createExecutionContext() {
- LogicSQL logicSQL = createLogicSQL();
+ private ExecutionContext createExecutionContext(final LogicSQL logicSQL) {
SQLCheckEngine.check(logicSQL.getSqlStatementContext().getSqlStatement(),
logicSQL.getParameters(),
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules(),
connection.getSchema(), metaDataContexts.getMetaDataMap(), null);
ExecutionContext result =
kernelProcessor.generateExecutionContext(logicSQL,
metaDataContexts.getMetaData(connection.getSchema()),
metaDataContexts.getProps());
@@ -452,7 +447,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
@Override
public void addBatch() {
try {
- executionContext = createExecutionContext();
+ executionContext = createExecutionContext(createLogicSQL());
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
} finally {
currentResultSet = null;
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index e92ef61..0872bcf 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -133,7 +133,12 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
ResultSet result;
try {
- executionContext = createExecutionContext(sql);
+ LogicSQL logicSQL = createLogicSQL(sql);
+ executionContext = createExecutionContext(logicSQL);
+ // TODO move federation route logic to binder
+ if (executionContext.getRouteContext().isFederated()) {
+ return executeFederationQuery(logicSQL);
+ }
List<QueryResult> queryResults = executeQuery0();
MergedResult mergedResult = mergeQuery(queryResults);
result = new
ShardingSphereResultSet(getShardingSphereResultSets(), mergedResult, this,
executionContext);
@@ -144,9 +149,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return result;
}
- private List<ResultSet> getShardingSphereResultSets() throws SQLException {
- return executionContext.getRouteContext().isFederated()
- ?
Collections.singletonList(executor.getFederationExecutor().getResultSet()) :
statements.stream().map(this::getResultSet).collect(Collectors.toList());
+ private List<ResultSet> getShardingSphereResultSets() {
+ return
statements.stream().map(this::getResultSet).collect(Collectors.toList());
}
private List<QueryResult> executeQuery0() throws SQLException {
@@ -154,9 +158,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(),
new RawSQLExecutorCallback()).stream().map(each ->
(QueryResult) each).collect(Collectors.toList());
}
- if (executionContext.getRouteContext().isFederated()) {
- return executeFederationQuery();
- }
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionContext();
cacheStatements(executionGroupContext.getInputGroups());
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchema()).getResource().getDatabaseType(),
@@ -164,13 +165,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getLogicSQL(), callback);
}
- private List<QueryResult> executeFederationQuery() throws SQLException {
- if (executionContext.getExecutionUnits().isEmpty()) {
- return Collections.emptyList();
- }
+ private ResultSet executeFederationQuery(final LogicSQL logicSQL) throws
SQLException {
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchema()).getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- return
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
callback, executionContext);
+ return
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
callback, logicSQL);
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
@@ -182,7 +180,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
- executionContext = createExecutionContext(sql);
+ executionContext = createExecutionContext(createLogicSQL(sql));
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
@@ -201,7 +199,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
returnGeneratedKeys = true;
}
try {
- executionContext = createExecutionContext(sql);
+ executionContext = createExecutionContext(createLogicSQL(sql));
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
@@ -218,7 +216,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql, final int[] columnIndexes)
throws SQLException {
returnGeneratedKeys = true;
try {
- executionContext = createExecutionContext(sql);
+ executionContext = createExecutionContext(createLogicSQL(sql));
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
@@ -235,7 +233,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql, final String[] columnNames)
throws SQLException {
returnGeneratedKeys = true;
try {
- executionContext = createExecutionContext(sql);
+ executionContext = createExecutionContext(createLogicSQL(sql));
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
@@ -321,15 +319,17 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private boolean execute0(final String sql, final ExecuteCallback callback)
throws SQLException {
try {
- executionContext = createExecutionContext(sql);
+ LogicSQL logicSQL = createLogicSQL(sql);
+ executionContext = createExecutionContext(logicSQL);
if
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
// TODO process getStatement
Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
+ // TODO move federation route logic to binder
if (executionContext.getRouteContext().isFederated()) {
- List<QueryResult> queryResults = executeFederationQuery();
- return !queryResults.isEmpty();
+ ResultSet resultSet = executeFederationQuery(logicSQL);
+ return null != resultSet;
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -360,9 +360,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return optionalSQLParserRule.orElse(null);
}
- private ExecutionContext createExecutionContext(final String sql) throws
SQLException {
+ private ExecutionContext createExecutionContext(final LogicSQL logicSQL)
throws SQLException {
clearStatements();
- LogicSQL logicSQL = createLogicSQL(sql);
SQLCheckEngine.check(logicSQL.getSqlStatementContext().getSqlStatement(),
logicSQL.getParameters(),
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules(),
connection.getSchema(), metaDataContexts.getMetaDataMap(), null);
return kernelProcessor.generateExecutionContext(logicSQL,
metaDataContexts.getMetaData(connection.getSchema()),
metaDataContexts.getProps());
@@ -397,6 +396,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
+ if (executionContext.getRouteContext().isFederated()) {
+ return executor.getFederationExecutor().getResultSet();
+ }
if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext ||
executionContext.getSqlStatementContext().getSqlStatement() instanceof
DALStatement) {
List<ResultSet> resultSets = getResultSets();
MergedResult mergedResult =
mergeQuery(getQueryResults(resultSets));
@@ -418,9 +420,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
for (Statement each : statements) {
result.add(each.getResultSet());
}
- if (executionContext.getRouteContext().isFederated()) {
- result.add(executor.getFederationExecutor().getResultSet());
- }
return result;
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 0553def..7869045 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -127,8 +127,8 @@ public final class ContextManager implements AutoCloseable {
return;
}
MetaDataContexts newMetaDataContexts =
buildNewMetaDataContext(schemaName);
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().put(schemaName,
-
newMetaDataContexts.getOptimizerContext().getMetaData().getSchemas().get(schemaName));
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().put(schemaName,
+
newMetaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName));
metaDataContexts.getMetaDataMap().put(schemaName,
newMetaDataContexts.getMetaData(schemaName));
}
@@ -139,7 +139,7 @@ public final class ContextManager implements AutoCloseable {
*/
public void deleteSchema(final String schemaName) {
if (metaDataContexts.getMetaDataMap().containsKey(schemaName)) {
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().remove(schemaName);
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().remove(schemaName);
metaDataContexts.getOptimizerContext().getParserContexts().remove(schemaName);
metaDataContexts.getOptimizerContext().getPlannerContexts().remove(schemaName);
ShardingSphereMetaData removeMetaData =
metaDataContexts.getMetaDataMap().remove(schemaName);
@@ -189,7 +189,7 @@ public final class ContextManager implements AutoCloseable {
public void alterRuleConfiguration(final String schemaName, final
Collection<RuleConfiguration> ruleConfigs) {
try {
MetaDataContexts changedMetaDataContexts =
buildChangedMetaDataContext(metaDataContexts.getMetaDataMap().get(schemaName),
ruleConfigs);
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().putAll(changedMetaDataContexts.getOptimizerContext().getMetaData().getSchemas());
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas());
Map<String, ShardingSphereMetaData> metaDataMap = new
HashMap<>(metaDataContexts.getMetaDataMap());
metaDataMap.putAll(changedMetaDataContexts.getMetaDataMap());
renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
@@ -207,7 +207,7 @@ public final class ContextManager implements AutoCloseable {
public void alterDataSourceConfiguration(final String schemaName, final
Map<String, DataSourceConfiguration> dataSourceConfigurations) {
try {
MetaDataContexts changedMetaDataContext =
buildChangedMetaDataContextWithChangedDataSource(metaDataContexts.getMetaDataMap().get(schemaName),
dataSourceConfigurations);
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getMetaData().getSchemas());
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
Map<String, ShardingSphereMetaData> metaDataMap = new
HashMap<>(metaDataContexts.getMetaDataMap());
metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
Collection<DataSource> pendingClosedDataSources =
getPendingClosedDataSources(schemaName, dataSourceConfigurations);
@@ -232,8 +232,8 @@ public final class ContextManager implements AutoCloseable {
metaDataContexts.getMetaData(schemaName).getRuleMetaData().getRules()));
Map<String, ShardingSphereMetaData> kernelMetaDataMap = new
HashMap<>(metaDataContexts.getMetaDataMap());
kernelMetaDataMap.put(schemaName, kernelMetaData);
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().put(schemaName,
- new FederationSchemaMetaData(schemaName,
SchemaBuilder.buildFederationSchema(tableMetaDataList,
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().put(schemaName,
+ new FederationSchemaMetaData(schemaName,
SchemaBuilder.buildKernelSchema(tableMetaDataList,
metaDataContexts.getMetaData(schemaName).getRuleMetaData().getRules()).getTables()));
renewMetaDataContexts(rebuildMetaDataContexts(kernelMetaDataMap));
}
@@ -321,7 +321,7 @@ public final class ContextManager implements AutoCloseable {
Collection<MutableDataNodeRule> rules =
metaDataContexts.getMetaData(schemaName).getRuleMetaData().findRules(MutableDataNodeRule.class);
for (String table : tables) {
metaDataContexts.getMetaData(schemaName).getSchema().remove(table);
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().get(schemaName).remove(table);
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName).remove(table);
rules.forEach(rule -> rule.remove(table));
}
metaDataContexts.getMetaDataPersistService().ifPresent(optional ->
optional.getSchemaMetaDataService().persist(schemaName,
metaDataContexts.getMetaData(schemaName).getSchema()));
@@ -395,7 +395,7 @@ public final class ContextManager implements AutoCloseable {
private void refreshMetaDataContext(final String schemaName, final
Map<String, DataSourceConfiguration> dataSourceConfigs) throws SQLException {
MetaDataContexts changedMetaDataContext =
buildChangedMetaDataContextWithAddedDataSource(metaDataContexts.getMetaDataMap().get(schemaName),
dataSourceConfigs);
metaDataContexts.getMetaDataMap().putAll(changedMetaDataContext.getMetaDataMap());
-
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getMetaData().getSchemas());
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
metaDataContexts.getOptimizerContext().getParserContexts().putAll(changedMetaDataContext.getOptimizerContext().getParserContexts());
metaDataContexts.getOptimizerContext().getPlannerContexts().putAll(changedMetaDataContext.getOptimizerContext().getPlannerContexts());
renewTransactionContext(schemaName,
metaDataContexts.getMetaData(schemaName).getResource());
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
index 6e2fc1f..6bf3d0d 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.mode.metadata;
import lombok.Getter;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
+import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
-import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
-import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import java.util.Collection;
@@ -54,8 +54,8 @@ public final class MetaDataContexts implements AutoCloseable {
private final ConfigurationProperties props;
public MetaDataContexts(final MetaDataPersistService
metaDataPersistService) {
- this(metaDataPersistService, new LinkedHashMap<>(), new
ShardingSphereRuleMetaData(Collections.emptyList(), Collections.emptyList()),
- null, new ConfigurationProperties(new Properties()),
OptimizerContextFactory.create(new HashMap<>()));
+ this(metaDataPersistService, new LinkedHashMap<>(), new
ShardingSphereRuleMetaData(Collections.emptyList(), Collections.emptyList()),
null,
+ new ConfigurationProperties(new Properties()),
OptimizerContextFactory.create(new HashMap<>(), new
ShardingSphereRuleMetaData(Collections.emptyList(), Collections.emptyList())));
}
public MetaDataContexts(final MetaDataPersistService
metaDataPersistService, final Map<String, ShardingSphereMetaData> metaDataMap,
final ShardingSphereRuleMetaData globalRuleMetaData,
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
index f1e7afd..b07ab05 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKe
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRecognizer;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.resource.CachedDatabaseMetaData;
import org.apache.shardingsphere.infra.metadata.resource.DataSourcesMetaData;
@@ -32,7 +33,6 @@ import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilder;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
-import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -88,8 +88,7 @@ public final class MetaDataContextsBuilder {
* @return meta data contexts
*/
public MetaDataContexts build(final MetaDataPersistService
metaDataPersistService) throws SQLException {
- Map<String, ShardingSphereMetaData> kernelMetaData = new
HashMap<>(schemaRuleConfigs.size(), 1);
- Map<String, ShardingSphereMetaData> federationMetaData = new
HashMap<>(schemaRuleConfigs.size(), 1);
+ Map<String, ShardingSphereMetaData> metaData = new
HashMap<>(schemaRuleConfigs.size(), 1);
for (String each : schemaRuleConfigs.keySet()) {
Map<String, DataSource> dataSourceMap = dataSources.get(each);
Collection<RuleConfiguration> ruleConfigs =
schemaRuleConfigs.get(each);
@@ -97,11 +96,10 @@ public final class MetaDataContextsBuilder {
ShardingSphereRuleMetaData ruleMetaData = new
ShardingSphereRuleMetaData(ruleConfigs, rules.get(each));
ShardingSphereResource resource = buildResource(databaseType,
dataSourceMap);
Collection<TableMetaData> tableMetaDataList =
schemas.get(each).getTables().values();
- federationMetaData.put(each, new ShardingSphereMetaData(each,
resource, ruleMetaData, SchemaBuilder.buildFederationSchema(tableMetaDataList,
rules.get(each))));
- kernelMetaData.put(each, new ShardingSphereMetaData(each,
resource, ruleMetaData, SchemaBuilder.buildKernelSchema(tableMetaDataList,
rules.get(each))));
+ metaData.put(each, new ShardingSphereMetaData(each, resource,
ruleMetaData, SchemaBuilder.buildKernelSchema(tableMetaDataList,
rules.get(each))));
}
- return new MetaDataContexts(metaDataPersistService, kernelMetaData,
- buildGlobalSchemaMetaData(kernelMetaData), executorEngine,
props, OptimizerContextFactory.create(federationMetaData));
+ ShardingSphereRuleMetaData globalMetaData =
buildGlobalSchemaMetaData(metaData);
+ return new MetaDataContexts(metaDataPersistService, metaData,
globalMetaData, executorEngine, props, OptimizerContextFactory.create(metaData,
globalMetaData));
}
private ShardingSphereRuleMetaData buildGlobalSchemaMetaData(final
Map<String, ShardingSphereMetaData> mataDataMap) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 9139964..e3b4948 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -21,19 +21,34 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefreshEngine;
import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.metadata.JDBCQueryResultMetaData;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
+import
org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
+import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
@@ -44,13 +59,17 @@ import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryRespon
import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -83,15 +102,24 @@ public final class DatabaseCommunicationEngine {
private final Collection<ResultSet> cachedResultSets = new
CopyOnWriteArrayList<>();
+ private final FederationExecutor federationExecutor;
+
+ private final JDBCBackendConnection backendConnection;
+
public DatabaseCommunicationEngine(final String driverType, final
ShardingSphereMetaData metaData, final LogicSQL logicSQL, final
JDBCBackendConnection backendConnection) {
this.driverType = driverType;
this.metaData = metaData;
this.logicSQL = logicSQL;
+ this.backendConnection = backendConnection;
proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection,
this);
kernelProcessor = new KernelProcessor();
+ String schemaName =
backendConnection.getConnectionSession().getSchemaName();
metadataRefreshEngine = new MetaDataRefreshEngine(metaData,
-
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getMetaData().getSchemas().get(backendConnection.getConnectionSession().getSchemaName()),
+
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName),
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps());
+ MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+ federationExecutor = FederationExecutorFactory.newInstance(schemaName,
metaDataContexts.getOptimizerContext(),
+ metaDataContexts.getProps(), new
JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(),
backendConnection.isSerialExecute()));
}
/**
@@ -120,6 +148,12 @@ public final class DatabaseCommunicationEngine {
*/
public ResponseHeader execute() throws SQLException {
ExecutionContext executionContext =
kernelProcessor.generateExecutionContext(logicSQL, metaData,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps());
+ // TODO move federation route logic to binder
+ if (executionContext.getRouteContext().isFederated()) {
+ MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+ ResultSet resultSet = doExecuteFederation(logicSQL,
metaDataContexts);
+ return processExecuteFederation(resultSet, metaDataContexts);
+ }
if (executionContext.getExecutionUnits().isEmpty()) {
return new
UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
}
@@ -131,6 +165,33 @@ public final class DatabaseCommunicationEngine {
: processExecuteUpdate(executionContext,
executeResults.stream().map(each -> (UpdateResult)
each).collect(Collectors.toList()));
}
+ private ResultSet doExecuteFederation(final LogicSQL logicSQL, final
MetaDataContexts metaDataContexts) throws SQLException {
+ boolean isReturnGeneratedKeys =
logicSQL.getSqlStatementContext().getSqlStatement() instanceof
MySQLInsertStatement;
+ DatabaseType databaseType =
metaDataContexts.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getResource().getDatabaseType();
+ ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, databaseType,
+ logicSQL.getSqlStatementContext().getSqlStatement(), this,
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
+ backendConnection.setFederationExecutor(federationExecutor);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaDataContexts);
+ return federationExecutor.executeQuery(prepareEngine, callback,
logicSQL);
+ }
+
+ private ResponseHeader processExecuteFederation(final ResultSet resultSet,
final MetaDataContexts metaDataContexts) throws SQLException {
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ queryHeaders = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ ShardingSphereMetaData metaData =
metaDataContexts.getMetaData(backendConnection.getConnectionSession().getSchemaName());
+ queryHeaders.add(QueryHeaderBuilder.build(new
JDBCQueryResultMetaData(resultSet.getMetaData()), metaData, columnIndex));
+ }
+ mergedResult = new
IteratorStreamMergedResult(Collections.singletonList(new
JDBCStreamQueryResult(resultSet)));
+ return new QueryResponseHeader(queryHeaders);
+ }
+
+ private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final
MetaDataContexts metaData) {
+ int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new DriverExecutionPrepareEngine<>(driverType,
maxConnectionsSizePerQuery, backendConnection, new
StatementOption(isReturnGeneratedKeys),
+
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
+ }
+
private Collection<ExecuteResult> doExecute(final ExecutionContext
executionContext) throws SQLException {
Collection<ExecuteResult> result =
proxySQLExecutor.execute(executionContext);
refreshMetaData(executionContext);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index e6bb8b0..a870ceb 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -33,19 +33,15 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
-import
org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
-import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
+import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
@@ -58,7 +54,6 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Proxy SQL Executor.
@@ -69,25 +64,18 @@ public final class ProxySQLExecutor {
private final JDBCBackendConnection backendConnection;
- private final DatabaseCommunicationEngine databaseCommunicationEngine;
-
private final ProxyJDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
- private final FederationExecutor federationExecutor;
-
public ProxySQLExecutor(final String type, final JDBCBackendConnection
backendConnection, final DatabaseCommunicationEngine
databaseCommunicationEngine) {
this.type = type;
this.backendConnection = backendConnection;
- this.databaseCommunicationEngine = databaseCommunicationEngine;
ExecutorEngine executorEngine =
BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
jdbcExecutor = new ProxyJDBCExecutor(type,
backendConnection.getConnectionSession(), databaseCommunicationEngine, new
JDBCExecutor(executorEngine, isSerialExecute));
rawExecutor = new RawExecutor(executorEngine, isSerialExecute,
metaDataContexts.getProps());
- federationExecutor = FederationExecutorFactory.newInstance(
- backendConnection.getConnectionSession().getSchemaName(),
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), new
JDBCExecutor(executorEngine, isSerialExecute));
}
/**
@@ -133,9 +121,6 @@ public final class ProxySQLExecutor {
if (rules.stream().anyMatch(each -> each instanceof RawExecutionRule))
{
return rawExecute(executionContext, rules,
maxConnectionsSizePerQuery);
}
- if (executionContext.getRouteContext().isFederated()) {
- return federateExecute(executionContext, isReturnGeneratedKeys,
SQLExecutorExceptionHandler.isExceptionThrown());
- }
return useDriverToExecute(executionContext, rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys,
SQLExecutorExceptionHandler.isExceptionThrown());
}
@@ -153,25 +138,6 @@ public final class ProxySQLExecutor {
return rawExecutor.execute(executionGroupContext,
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
}
- private Collection<ExecuteResult> federateExecute(final ExecutionContext
executionContext, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
- if (executionContext.getExecutionUnits().isEmpty()) {
- return Collections.emptyList();
- }
- MetaDataContexts metaData =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- String schemaName =
backendConnection.getConnectionSession().getSchemaName();
- ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(type,
metaData.getMetaData(schemaName).getResource().getDatabaseType(),
- executionContext.getSqlStatementContext().getSqlStatement(),
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true);
- backendConnection.setFederationExecutor(federationExecutor);
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaData);
- return federationExecutor.executeQuery(prepareEngine, callback,
executionContext).stream().map(each -> (ExecuteResult)
each).collect(Collectors.toList());
- }
-
- private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final
MetaDataContexts metaData) {
- int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(type,
maxConnectionsSizePerQuery, backendConnection, new
StatementOption(isReturnGeneratedKeys),
-
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
- }
-
private Collection<ExecuteResult> useDriverToExecute(final
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = new DriverExecutionPrepareEngine<>(
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
index 8444e86..113265b 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
@@ -22,18 +22,10 @@
<column name="pwd" />
<column name="status" />
<column name="item_id" />
- <column name="user_id" />
- <column name="status" />
+ <column name="user_id0" />
+ <column name="status0" />
<column name="creation_date" />
</metadata>
<row values="0, 10000, a00, init_read, 100000, 0, init_read, 2017-08-08" />
<row values="11, 11001, b11, init_read, 110001, 11, init_read, 2017-08-08"
/>
- <row values="22, 12002, c22, init_read, 120002, 22, init_read, 2017-08-08"
/>
- <row values="33, 13003, d33, init_read, 130003, 33, init_read, 2017-08-08"
/>
- <row values="44, 14004, e44, init_read, 140004, 44, init_read, 2017-08-08"
/>
- <row values="55, 15005, f55, init_read, 150005, 55, init_read, 2017-08-08"
/>
- <row values="66, 16006, g66, init_read, 160006, 66, init_read, 2017-08-08"
/>
- <row values="77, 17007, h77, init_read, 170007, 77, init_read, 2017-08-08"
/>
- <row values="88, 18008, i88, init_read, 180008, 88, init_read, 2017-08-08"
/>
- <row values="99, 19009, j99, init_read, 190009, 99, init_read, 2017-08-08"
/>
</dataset>
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
index 0eff84c..201bd60 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
@@ -513,7 +513,7 @@
<assertion expected-data-file="select_in_or_encrypt.xml"/>
</test-case>
- <test-case sql="SELECT * FROM t_user u INNER JOIN t_user_item m ON
u.user_id=m.user_id WHERE u.user_id IN (0, 11, 22, 33, 44, 55, 66, 77, 88, 99)"
scenario-types="encrypt,dbtbl_with_readwrite_splitting_and_encrypt">
+ <test-case sql="SELECT * FROM t_user u INNER JOIN t_user_item m ON
u.user_id=m.user_id WHERE u.user_id IN (0, 11)"
scenario-types="encrypt,dbtbl_with_readwrite_splitting_and_encrypt">
<assertion expected-data-file="select_join_encrypt.xml"/>
</test-case>