This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f5cf296 Support sharding join SQLs using calcite executor (#9534)
f5cf296 is described below
commit f5cf2963a4a76d3f860bbd42cee96a7a59b5502e
Author: Juan Pan(Trista) <[email protected]>
AuthorDate: Mon Mar 1 11:58:48 2021 +0800
Support sharding join SQLs using calcite executor (#9534)
---
.../engine/type/ShardingRouteEngineFactory.java | 30 +++++++++++++++-------
.../ShardingFederatedRoutingEngine.java} | 27 +++++++++----------
.../statement/dml/SelectStatementContext.java | 10 ++++++++
.../infra/route/context/RouteContext.java | 17 ++++++++++++
4 files changed, 60 insertions(+), 24 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
index a7b1137..e9119cd 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.sharding.route.engine.type;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -30,7 +31,7 @@ import
org.apache.shardingsphere.sharding.route.engine.type.broadcast.ShardingIn
import
org.apache.shardingsphere.sharding.route.engine.type.broadcast.ShardingTableBroadcastRoutingEngine;
import
org.apache.shardingsphere.sharding.route.engine.type.complex.ShardingComplexRoutingEngine;
import
org.apache.shardingsphere.sharding.route.engine.type.ignore.ShardingIgnoreRoutingEngine;
-import
org.apache.shardingsphere.sharding.route.engine.type.single.ShardingSingleRoutingEngine;
+import
org.apache.shardingsphere.sharding.route.engine.type.federated.ShardingFederatedRoutingEngine;
import
org.apache.shardingsphere.sharding.route.engine.type.single.SingleTableRoutingEngine;
import
org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine;
import
org.apache.shardingsphere.sharding.route.engine.type.unicast.ShardingUnicastRoutingEngine;
@@ -63,7 +64,7 @@ public final class ShardingRouteEngineFactory {
/**
* Create new instance of routing engine.
- *
+ *
* @param shardingRule sharding rule
* @param metaData ShardingSphere metaData
* @param sqlStatementContext SQL statement context
@@ -71,7 +72,7 @@ public final class ShardingRouteEngineFactory {
* @param props ShardingSphere properties
* @return new instance of routing engine
*/
- public static ShardingRouteEngine newInstance(final ShardingRule
shardingRule, final ShardingSphereMetaData metaData,
+ public static ShardingRouteEngine newInstance(final ShardingRule
shardingRule, final ShardingSphereMetaData metaData,
final SQLStatementContext<?>
sqlStatementContext, final ShardingConditions shardingConditions, final
ConfigurationProperties props) {
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
Collection<String> tableNames =
sqlStatementContext.getTablesContext().getTableNames();
@@ -82,7 +83,7 @@ public final class ShardingRouteEngineFactory {
return getDDLRoutingEngine(shardingRule, metaData,
sqlStatementContext);
}
if (sqlStatement instanceof DALStatement) {
- return getDALRoutingEngine(shardingRule, metaData, sqlStatement,
tableNames);
+ return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
}
if (sqlStatement instanceof DCLStatement) {
return getDCLRoutingEngine(shardingRule, metaData,
sqlStatementContext);
@@ -104,8 +105,7 @@ public final class ShardingRouteEngineFactory {
return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(),
sqlStatementContext);
}
- private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule
shardingRule, final ShardingSphereMetaData metaData,
- final SQLStatement
sqlStatement, final Collection<String> tableNames) {
+ private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule
shardingRule, final SQLStatement sqlStatement, final Collection<String>
tableNames) {
if (sqlStatement instanceof MySQLUseStatement) {
return new ShardingIgnoreRoutingEngine();
}
@@ -153,18 +153,30 @@ public final class ShardingRouteEngineFactory {
return new SingleTableRoutingEngine(tableNames, sqlStatement);
}
if (shardingRule.tableRuleExists(tableNames) &&
shardingRule.singleTableRuleExists(tableNames)) {
- return new ShardingSingleRoutingEngine(tableNames);
+ return new ShardingFederatedRoutingEngine(tableNames);
}
- return getShardingRoutingEngine(shardingRule, shardingConditions,
tableNames, props);
+ return getShardingRoutingEngine(shardingRule, shardingConditions,
sqlStatementContext, tableNames, props);
}
- private static ShardingRouteEngine getShardingRoutingEngine(final
ShardingRule shardingRule, final ShardingConditions shardingConditions,
+ private static ShardingRouteEngine getShardingRoutingEngine(final
ShardingRule shardingRule, final ShardingConditions shardingConditions,
+ final
SQLStatementContext<?> sqlStatementContext,
final
Collection<String> tableNames, final ConfigurationProperties props) {
Collection<String> shardingTableNames =
shardingRule.getShardingLogicTableNames(tableNames);
if (1 == shardingTableNames.size() ||
shardingRule.isAllBindingTables(shardingTableNames)) {
return new
ShardingStandardRoutingEngine(shardingTableNames.iterator().next(),
shardingConditions, props);
}
+ if (isFederatedQuery(sqlStatementContext, tableNames,
shardingTableNames)) {
+ return new ShardingFederatedRoutingEngine(tableNames);
+ }
// TODO config for cartesian set
return new ShardingComplexRoutingEngine(tableNames,
shardingConditions, props);
}
+
+ private static boolean isFederatedQuery(final SQLStatementContext<?>
sqlStatementContext, final Collection<String> tableNames, final
Collection<String> shardingTableNames) {
+ if (!(sqlStatementContext instanceof SelectStatementContext)) {
+ return false;
+ }
+ SelectStatementContext select = (SelectStatementContext)
sqlStatementContext;
+ return tableNames.size() == shardingTableNames.size() &&
(select.isContainsJoinQuery() || select.isContainsSubquery());
+ }
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/single/ShardingSingleRoutingEngine.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/federated/ShardingFederatedRoutingEngine.java
similarity index 61%
rename from
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/single/ShardingSingleRoutingEngine.java
rename to
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/federated/ShardingFederatedRoutingEngine.java
index 7ec9aee..47da4e5 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/single/ShardingSingleRoutingEngine.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/federated/ShardingFederatedRoutingEngine.java
@@ -15,26 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sharding.route.engine.type.single;
+package org.apache.shardingsphere.sharding.route.engine.type.federated;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
import
org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngine;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.TableRule;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
/**
- * Sharding single routing engine.
+ * Sharding federated routing engine.
*/
@RequiredArgsConstructor
-public final class ShardingSingleRoutingEngine implements ShardingRouteEngine {
+public final class ShardingFederatedRoutingEngine implements
ShardingRouteEngine {
private final Collection<String> logicTables;
@@ -42,23 +39,23 @@ public final class ShardingSingleRoutingEngine implements
ShardingRouteEngine {
public void route(final RouteContext routeContext, final ShardingRule
shardingRule) {
for (String each : logicTables) {
if (shardingRule.getSingleTableRules().containsKey(each)) {
- String datasource =
shardingRule.getSingleTableRules().get(each).getDataSourceName();
- RouteUnit unit = new RouteUnit(new RouteMapper(datasource,
datasource), Collections.singletonList(new RouteMapper(each, each)));
- routeContext.getRouteUnits().add(unit);
+ String dataSourceName =
shardingRule.getSingleTableRules().get(each).getDataSourceName();
+ RouteMapper dataSource = new RouteMapper(dataSourceName,
dataSourceName);
+ RouteMapper table = new RouteMapper(each, each);
+ routeContext.putRouteUnit(dataSource, table);
} else {
-
routeContext.getRouteUnits().addAll(getAllRouteUnits(shardingRule, each));
+ fillRouteContext(routeContext, shardingRule, each);
}
}
routeContext.setToCalcite(true);
}
- private Collection<RouteUnit> getAllRouteUnits(final ShardingRule
shardingRule, final String logicTableName) {
- Collection<RouteUnit> result = new LinkedList<>();
+ private void fillRouteContext(final RouteContext routeContext, final
ShardingRule shardingRule, final String logicTableName) {
TableRule tableRule = shardingRule.getTableRule(logicTableName);
for (DataNode each : tableRule.getActualDataNodes()) {
- RouteUnit routeUnit = new RouteUnit(new
RouteMapper(each.getDataSourceName(), each.getDataSourceName()),
Collections.singletonList(new RouteMapper(logicTableName,
each.getTableName())));
- result.add(routeUnit);
+ RouteMapper dataSource = new RouteMapper(each.getDataSourceName(),
each.getDataSourceName());
+ RouteMapper table = new RouteMapper(logicTableName,
each.getTableName());
+ routeContext.putRouteUnit(dataSource, table);
}
- return result;
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
index 2b8ea7d..00aa1dd 100644
---
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.order.item.Ex
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.order.item.IndexOrderByItemSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.order.item.TextOrderByItemSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.predicate.WhereSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.JoinTableSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
@@ -102,6 +103,15 @@ public final class SelectStatementContext extends
CommonSQLStatementContext<Sele
}
/**
+ * Whether it contain join query.
+ *
+ * @return contain join query or not
+ */
+ public boolean isContainsJoinQuery() {
+ return getSqlStatement().getFrom() instanceof JoinTableSegment;
+ }
+
+ /**
* Set indexes.
*
* @param columnLabelIndexMap map for column label and index
diff --git
a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
index 7b05ad7..0ed5ff7 100644
---
a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
@@ -135,4 +135,21 @@ public final class RouteContext {
}
return Optional.empty();
}
+
+ /**
+ * Put route unit.
+ *
+ * @param dataSourceMapper database mapper
+ * @param tableMapper table mapper
+ */
+ public void putRouteUnit(final RouteMapper dataSourceMapper, final
RouteMapper tableMapper) {
+ Optional<RouteUnit> target = routeUnits.stream().filter(unit ->
unit.getDataSourceMapper().equals(dataSourceMapper)).findFirst();
+ RouteUnit unit = new RouteUnit(dataSourceMapper, new
LinkedHashSet<>());
+ if (target.isPresent()) {
+ unit.getTableMappers().addAll(target.get().getTableMappers());
+ routeUnits.remove(target.get());
+ }
+ unit.getTableMappers().add(tableMapper);
+ routeUnits.add(unit);
+ }
}