This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 29bffb50de1 Fixed the problem of broadcast table execution error when
readwrite_splitting rule exist (#29025)
29bffb50de1 is described below
commit 29bffb50de1692f81414441e85081a08f54262d7
Author: jiangML <[email protected]>
AuthorDate: Mon Nov 13 17:33:41 2023 +0800
Fixed the problem of broadcast table execution error when
readwrite_splitting rule exist (#29025)
* Fixed the problem of broadcast table execution error when
readwrite_splitting rule exist
* optimize code
---
.../broadcast/route/BroadcastSQLRouter.java | 4 +--
.../BroadcastDatabaseBroadcastRoutingEngine.java | 2 +-
.../BroadcastInstanceBroadcastRoutingEngine.java | 2 +-
.../BroadcastTableBroadcastRoutingEngine.java | 4 +--
.../unicast/BroadcastUnicastRoutingEngine.java | 2 +-
.../broadcast/rule/BroadcastRule.java | 41 +++++++++++++++-------
.../rule/builder/BroadcastRuleBuilder.java | 2 +-
...roadcastDatabaseBroadcastRoutingEngineTest.java | 2 +-
...roadcastInstanceBroadcastRoutingEngineTest.java | 2 +-
.../BroadcastTableBroadcastRoutingEngineTest.java | 4 +--
.../unicast/BroadcastUnicastRoutingEngineTest.java | 2 +-
.../YamlDatabaseConfigurationImportExecutor.java | 3 +-
12 files changed, 44 insertions(+), 26 deletions(-)
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
index eb67e3b8aa5..44adbff7db4 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
@@ -143,7 +143,7 @@ public final class BroadcastSQLRouter implements
SQLRouter<BroadcastRule> {
private void routeToAllDatabaseInstance(final RouteContext routeContext,
final ShardingSphereDatabase database, final BroadcastRule broadcastRule) {
routeContext.getRouteUnits().clear();
- for (String each : broadcastRule.getAvailableDataSourceNames()) {
+ for (String each : broadcastRule.getDataSourceNames()) {
if
(database.getResourceMetaData().getAllInstanceDataSourceNames().contains(each))
{
routeContext.getRouteUnits().add(new RouteUnit(new
RouteMapper(each, each), Collections.emptyList()));
}
@@ -152,7 +152,7 @@ public final class BroadcastSQLRouter implements
SQLRouter<BroadcastRule> {
private void routeToAllDatabase(final RouteContext routeContext, final
BroadcastRule broadcastRule) {
routeContext.getRouteUnits().clear();
- for (String each : broadcastRule.getAvailableDataSourceNames()) {
+ for (String each : broadcastRule.getDataSourceNames()) {
routeContext.getRouteUnits().add(new RouteUnit(new
RouteMapper(each, each), Collections.emptyList()));
}
}
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
index f306ff9be01..97ecab07612 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
@@ -32,7 +32,7 @@ public final class BroadcastDatabaseBroadcastRoutingEngine
implements BroadcastR
@Override
public RouteContext route(final RouteContext routeContext, final
BroadcastRule broadcastRule) {
- for (String each : broadcastRule.getAvailableDataSourceNames()) {
+ for (String each : broadcastRule.getDataSourceNames()) {
routeContext.getRouteUnits().add(new RouteUnit(new
RouteMapper(each, each), Collections.emptyList()));
}
return routeContext;
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
index f7280a3934b..ef3768deb64 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
@@ -38,7 +38,7 @@ public class BroadcastInstanceBroadcastRoutingEngine
implements BroadcastRouteEn
@Override
public RouteContext route(final RouteContext routeContext, final
BroadcastRule broadcastRule) {
RouteContext result = new RouteContext();
- for (String each : broadcastRule.getAvailableDataSourceNames()) {
+ for (String each : broadcastRule.getDataSourceNames()) {
if
(resourceMetaData.getAllInstanceDataSourceNames().contains(each)) {
result.getRouteUnits().add(new RouteUnit(new RouteMapper(each,
each), Collections.emptyList()));
}
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
index 9ea7b5339fd..eb4b295dc99 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
@@ -49,7 +49,7 @@ public final class BroadcastTableBroadcastRoutingEngine
implements BroadcastRout
private RouteContext getRouteContext(final BroadcastRule broadcastRule) {
RouteContext result = new RouteContext();
- for (String each : broadcastRule.getAvailableDataSourceNames()) {
+ for (String each : broadcastRule.getDataSourceNames()) {
result.getRouteUnits().add(new RouteUnit(new RouteMapper(each,
each), Collections.singletonList(new RouteMapper("", ""))));
}
return result;
@@ -58,7 +58,7 @@ public final class BroadcastTableBroadcastRoutingEngine
implements BroadcastRout
private RouteContext getRouteContext(final BroadcastRule broadcastRule,
final Collection<String> logicTableNames) {
RouteContext result = new RouteContext();
Collection<RouteMapper> tableRouteMappers =
getTableRouteMappers(logicTableNames);
- for (String each : broadcastRule.getAvailableDataSourceNames()) {
+ for (String each : broadcastRule.getDataSourceNames()) {
RouteMapper dataSourceMapper = new RouteMapper(each, each);
result.getRouteUnits().add(new RouteUnit(dataSourceMapper,
tableRouteMappers));
}
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
index 07188a42795..89ced5d87c9 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
@@ -50,7 +50,7 @@ public final class BroadcastUnicastRoutingEngine implements
BroadcastRouteEngine
@Override
public RouteContext route(final RouteContext routeContext, final
BroadcastRule broadcastRule) {
- RouteMapper dataSourceMapper =
getDataSourceRouteMapper(broadcastRule.getAvailableDataSourceNames());
+ RouteMapper dataSourceMapper =
getDataSourceRouteMapper(broadcastRule.getDataSourceNames());
routeContext.getRouteUnits().add(new RouteUnit(dataSourceMapper,
createTableRouteMappers()));
return routeContext;
}
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
index 58892c0ac29..ac2532aaf16 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.broadcast.rule;
import lombok.Getter;
import
org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.TableContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.TableNamesMapper;
@@ -31,6 +33,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -53,17 +56,39 @@ public final class BroadcastRule implements DatabaseRule,
DataNodeContainedRule,
private final TableNamesMapper logicalTableMapper;
- public BroadcastRule(final BroadcastRuleConfiguration config, final String
databaseName, final Map<String, DataSource> dataSources) {
+ public BroadcastRule(final BroadcastRuleConfiguration config, final String
databaseName, final Map<String, DataSource> dataSources, final
Collection<ShardingSphereRule> builtRules) {
configuration = config;
this.databaseName = databaseName;
- dataSourceNames = getDataSourceNames(dataSources);
+ dataSourceNames = getAggregatedDataSourceNames(dataSources,
builtRules);
tables = createBroadcastTables(config.getTables());
logicalTableMapper = createTableMapper();
tableDataNodes = createShardingTableDataNodes(dataSourceNames, tables);
}
- private Collection<String> getDataSourceNames(final Map<String,
DataSource> dataSources) {
- return new LinkedList<>(dataSources.keySet());
+ private Collection<String> getAggregatedDataSourceNames(final Map<String,
DataSource> dataSources, final Collection<ShardingSphereRule> builtRules) {
+ Collection<String> result = new LinkedList<>(dataSources.keySet());
+ for (ShardingSphereRule each : builtRules) {
+ if (each instanceof DataSourceContainedRule) {
+ result = getAggregatedDataSourceNames(result,
(DataSourceContainedRule) each);
+ }
+ }
+ return result;
+ }
+
+ private Collection<String> getAggregatedDataSourceNames(final
Collection<String> dataSourceNames, final DataSourceContainedRule builtRule) {
+ Collection<String> result = new LinkedList<>();
+ for (Entry<String, Collection<String>> entry :
builtRule.getDataSourceMapper().entrySet()) {
+ for (String each : entry.getValue()) {
+ if (dataSourceNames.contains(each)) {
+ dataSourceNames.remove(each);
+ if (!result.contains(entry.getKey())) {
+ result.add(entry.getKey());
+ }
+ }
+ }
+ }
+ result.addAll(dataSourceNames);
+ return result;
}
private Collection<String> createBroadcastTables(final Collection<String>
broadcastTables) {
@@ -150,14 +175,6 @@ public final class BroadcastRule implements DatabaseRule,
DataNodeContainedRule,
return !logicTableNames.isEmpty() &&
tables.containsAll(logicTableNames);
}
- /**
- * Get available datasource names.
- * @return datasource names
- */
- public Collection<String> getAvailableDataSourceNames() {
- return dataSourceNames;
- }
-
@Override
public TableNamesMapper getLogicTableMapper() {
return logicalTableMapper;
diff --git
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
index 1d1365af371..5060e6afdc9 100644
---
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
+++
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
@@ -37,7 +37,7 @@ public final class BroadcastRuleBuilder implements
DatabaseRuleBuilder<Broadcast
@Override
public BroadcastRule build(final BroadcastRuleConfiguration config, final
String databaseName, final DatabaseType protocolType,
final Map<String, DataSource> dataSources,
final Collection<ShardingSphereRule> builtRules, final InstanceContext
instanceContext) {
- return new BroadcastRule(config, databaseName, dataSources);
+ return new BroadcastRule(config, databaseName, dataSources,
builtRules);
}
@Override
diff --git
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
index e5203efb2bf..614258e8a09 100644
---
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
+++
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
@@ -35,7 +35,7 @@ class BroadcastDatabaseBroadcastRoutingEngineTest {
@Test
void assertRoute() {
BroadcastRule broadcastRule = mock(BroadcastRule.class);
-
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
+
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
BroadcastDatabaseBroadcastRoutingEngine engine = new
BroadcastDatabaseBroadcastRoutingEngine();
RouteContext routeContext = engine.route(new RouteContext(),
broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(2));
diff --git
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
index 081678311e1..b354b6a4b21 100644
---
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
+++
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
@@ -39,7 +39,7 @@ class BroadcastInstanceBroadcastRoutingEngineTest {
when(resourceMetaData.getAllInstanceDataSourceNames()).thenReturn(Collections.singleton("ds_0"));
BroadcastInstanceBroadcastRoutingEngine engine = new
BroadcastInstanceBroadcastRoutingEngine(resourceMetaData);
BroadcastRule broadcastRule = mock(BroadcastRule.class);
-
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
+
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
RouteContext routeContext = engine.route(new RouteContext(),
broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(1));
assertDataSourceRouteMapper(routeContext.getRouteUnits().iterator().next(),
"ds_0");
diff --git
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
index 1e1de22d899..89399b1a57f 100644
---
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
+++
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
@@ -41,7 +41,7 @@ class BroadcastTableBroadcastRoutingEngineTest {
Collection<String> broadcastRuleTableNames =
Collections.singleton("t_address");
BroadcastTableBroadcastRoutingEngine engine = new
BroadcastTableBroadcastRoutingEngine(broadcastRuleTableNames);
BroadcastRule broadcastRule = mock(BroadcastRule.class);
-
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
+
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
when(broadcastRule.getBroadcastRuleTableNames(any())).thenReturn(Collections.singleton("t_address"));
RouteContext routeContext = engine.route(new RouteContext(),
broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(2));
@@ -55,7 +55,7 @@ class BroadcastTableBroadcastRoutingEngineTest {
Collection<String> broadcastRuleTableNames =
Collections.singleton("t_address");
BroadcastTableBroadcastRoutingEngine engine = new
BroadcastTableBroadcastRoutingEngine(broadcastRuleTableNames);
BroadcastRule broadcastRule = mock(BroadcastRule.class);
-
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
+
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
when(broadcastRule.getBroadcastRuleTableNames(any())).thenReturn(Collections.emptyList());
RouteContext routeContext = engine.route(new RouteContext(),
broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(2));
diff --git
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
index a8ea2ec2092..522a87f0c08 100644
---
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
+++
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
@@ -42,7 +42,7 @@ class BroadcastUnicastRoutingEngineTest {
@BeforeEach
void setUp() {
broadcastRule = mock(BroadcastRule.class);
-
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
+
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0",
"ds_1"));
}
@Test
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
index 2adc0108c3c..9df70f3f0c8 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
@@ -286,7 +286,8 @@ public final class YamlDatabaseConfigurationImportExecutor {
allRuleConfigs.add(broadcastRuleConfig);
database.getRuleMetaData().getRules().add(new
BroadcastRule(broadcastRuleConfig, database.getName(),
database.getResourceMetaData().getStorageUnits().entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey, entry ->
entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue,
LinkedHashMap::new))));
+ .collect(Collectors.toMap(Entry::getKey, entry ->
entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue,
LinkedHashMap::new)),
+ database.getRuleMetaData().getRules()));
}
private void addSingleRuleConfiguration(final SingleRuleConfiguration
singleRuleConfig, final Collection<RuleConfiguration> allRuleConfigs, final
ShardingSphereDatabase database) {