This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 29bffb50de1 Fixed the problem of broadcast table execution error when 
readwrite_splitting rule exist (#29025)
29bffb50de1 is described below

commit 29bffb50de1692f81414441e85081a08f54262d7
Author: jiangML <[email protected]>
AuthorDate: Mon Nov 13 17:33:41 2023 +0800

    Fixed the problem of broadcast table execution error when 
readwrite_splitting rule exist (#29025)
    
    * Fixed the problem of broadcast table execution error when 
readwrite_splitting rule exist
    
    * optimize code
---
 .../broadcast/route/BroadcastSQLRouter.java        |  4 +--
 .../BroadcastDatabaseBroadcastRoutingEngine.java   |  2 +-
 .../BroadcastInstanceBroadcastRoutingEngine.java   |  2 +-
 .../BroadcastTableBroadcastRoutingEngine.java      |  4 +--
 .../unicast/BroadcastUnicastRoutingEngine.java     |  2 +-
 .../broadcast/rule/BroadcastRule.java              | 41 +++++++++++++++-------
 .../rule/builder/BroadcastRuleBuilder.java         |  2 +-
 ...roadcastDatabaseBroadcastRoutingEngineTest.java |  2 +-
 ...roadcastInstanceBroadcastRoutingEngineTest.java |  2 +-
 .../BroadcastTableBroadcastRoutingEngineTest.java  |  4 +--
 .../unicast/BroadcastUnicastRoutingEngineTest.java |  2 +-
 .../YamlDatabaseConfigurationImportExecutor.java   |  3 +-
 12 files changed, 44 insertions(+), 26 deletions(-)

diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
index eb67e3b8aa5..44adbff7db4 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/BroadcastSQLRouter.java
@@ -143,7 +143,7 @@ public final class BroadcastSQLRouter implements 
SQLRouter<BroadcastRule> {
     
     private void routeToAllDatabaseInstance(final RouteContext routeContext, 
final ShardingSphereDatabase database, final BroadcastRule broadcastRule) {
         routeContext.getRouteUnits().clear();
-        for (String each : broadcastRule.getAvailableDataSourceNames()) {
+        for (String each : broadcastRule.getDataSourceNames()) {
             if 
(database.getResourceMetaData().getAllInstanceDataSourceNames().contains(each)) 
{
                 routeContext.getRouteUnits().add(new RouteUnit(new 
RouteMapper(each, each), Collections.emptyList()));
             }
@@ -152,7 +152,7 @@ public final class BroadcastSQLRouter implements 
SQLRouter<BroadcastRule> {
     
     private void routeToAllDatabase(final RouteContext routeContext, final 
BroadcastRule broadcastRule) {
         routeContext.getRouteUnits().clear();
-        for (String each : broadcastRule.getAvailableDataSourceNames()) {
+        for (String each : broadcastRule.getDataSourceNames()) {
             routeContext.getRouteUnits().add(new RouteUnit(new 
RouteMapper(each, each), Collections.emptyList()));
         }
     }
diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
index f306ff9be01..97ecab07612 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngine.java
@@ -32,7 +32,7 @@ public final class BroadcastDatabaseBroadcastRoutingEngine 
implements BroadcastR
     
     @Override
     public RouteContext route(final RouteContext routeContext, final 
BroadcastRule broadcastRule) {
-        for (String each : broadcastRule.getAvailableDataSourceNames()) {
+        for (String each : broadcastRule.getDataSourceNames()) {
             routeContext.getRouteUnits().add(new RouteUnit(new 
RouteMapper(each, each), Collections.emptyList()));
         }
         return routeContext;
diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
index f7280a3934b..ef3768deb64 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngine.java
@@ -38,7 +38,7 @@ public class BroadcastInstanceBroadcastRoutingEngine 
implements BroadcastRouteEn
     @Override
     public RouteContext route(final RouteContext routeContext, final 
BroadcastRule broadcastRule) {
         RouteContext result = new RouteContext();
-        for (String each : broadcastRule.getAvailableDataSourceNames()) {
+        for (String each : broadcastRule.getDataSourceNames()) {
             if 
(resourceMetaData.getAllInstanceDataSourceNames().contains(each)) {
                 result.getRouteUnits().add(new RouteUnit(new RouteMapper(each, 
each), Collections.emptyList()));
             }
diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
index 9ea7b5339fd..eb4b295dc99 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngine.java
@@ -49,7 +49,7 @@ public final class BroadcastTableBroadcastRoutingEngine 
implements BroadcastRout
     
     private RouteContext getRouteContext(final BroadcastRule broadcastRule) {
         RouteContext result = new RouteContext();
-        for (String each : broadcastRule.getAvailableDataSourceNames()) {
+        for (String each : broadcastRule.getDataSourceNames()) {
             result.getRouteUnits().add(new RouteUnit(new RouteMapper(each, 
each), Collections.singletonList(new RouteMapper("", ""))));
         }
         return result;
@@ -58,7 +58,7 @@ public final class BroadcastTableBroadcastRoutingEngine 
implements BroadcastRout
     private RouteContext getRouteContext(final BroadcastRule broadcastRule, 
final Collection<String> logicTableNames) {
         RouteContext result = new RouteContext();
         Collection<RouteMapper> tableRouteMappers = 
getTableRouteMappers(logicTableNames);
-        for (String each : broadcastRule.getAvailableDataSourceNames()) {
+        for (String each : broadcastRule.getDataSourceNames()) {
             RouteMapper dataSourceMapper = new RouteMapper(each, each);
             result.getRouteUnits().add(new RouteUnit(dataSourceMapper, 
tableRouteMappers));
         }
diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
index 07188a42795..89ced5d87c9 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngine.java
@@ -50,7 +50,7 @@ public final class BroadcastUnicastRoutingEngine implements 
BroadcastRouteEngine
     
     @Override
     public RouteContext route(final RouteContext routeContext, final 
BroadcastRule broadcastRule) {
-        RouteMapper dataSourceMapper = 
getDataSourceRouteMapper(broadcastRule.getAvailableDataSourceNames());
+        RouteMapper dataSourceMapper = 
getDataSourceRouteMapper(broadcastRule.getDataSourceNames());
         routeContext.getRouteUnits().add(new RouteUnit(dataSourceMapper, 
createTableRouteMappers()));
         return routeContext;
     }
diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
index 58892c0ac29..ac2532aaf16 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/BroadcastRule.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.broadcast.rule;
 import lombok.Getter;
 import 
org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.TableContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.TableNamesMapper;
 
@@ -31,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -53,17 +56,39 @@ public final class BroadcastRule implements DatabaseRule, 
DataNodeContainedRule,
     
     private final TableNamesMapper logicalTableMapper;
     
-    public BroadcastRule(final BroadcastRuleConfiguration config, final String 
databaseName, final Map<String, DataSource> dataSources) {
+    public BroadcastRule(final BroadcastRuleConfiguration config, final String 
databaseName, final Map<String, DataSource> dataSources, final 
Collection<ShardingSphereRule> builtRules) {
         configuration = config;
         this.databaseName = databaseName;
-        dataSourceNames = getDataSourceNames(dataSources);
+        dataSourceNames = getAggregatedDataSourceNames(dataSources, 
builtRules);
         tables = createBroadcastTables(config.getTables());
         logicalTableMapper = createTableMapper();
         tableDataNodes = createShardingTableDataNodes(dataSourceNames, tables);
     }
     
-    private Collection<String> getDataSourceNames(final Map<String, 
DataSource> dataSources) {
-        return new LinkedList<>(dataSources.keySet());
+    private Collection<String> getAggregatedDataSourceNames(final Map<String, 
DataSource> dataSources, final Collection<ShardingSphereRule> builtRules) {
+        Collection<String> result = new LinkedList<>(dataSources.keySet());
+        for (ShardingSphereRule each : builtRules) {
+            if (each instanceof DataSourceContainedRule) {
+                result = getAggregatedDataSourceNames(result, 
(DataSourceContainedRule) each);
+            }
+        }
+        return result;
+    }
+    
+    private Collection<String> getAggregatedDataSourceNames(final 
Collection<String> dataSourceNames, final DataSourceContainedRule builtRule) {
+        Collection<String> result = new LinkedList<>();
+        for (Entry<String, Collection<String>> entry : 
builtRule.getDataSourceMapper().entrySet()) {
+            for (String each : entry.getValue()) {
+                if (dataSourceNames.contains(each)) {
+                    dataSourceNames.remove(each);
+                    if (!result.contains(entry.getKey())) {
+                        result.add(entry.getKey());
+                    }
+                }
+            }
+        }
+        result.addAll(dataSourceNames);
+        return result;
     }
     
     private Collection<String> createBroadcastTables(final Collection<String> 
broadcastTables) {
@@ -150,14 +175,6 @@ public final class BroadcastRule implements DatabaseRule, 
DataNodeContainedRule,
         return !logicTableNames.isEmpty() && 
tables.containsAll(logicTableNames);
     }
     
-    /**
-     * Get available datasource names.
-     * @return datasource names
-     */
-    public Collection<String> getAvailableDataSourceNames() {
-        return dataSourceNames;
-    }
-    
     @Override
     public TableNamesMapper getLogicTableMapper() {
         return logicalTableMapper;
diff --git 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
index 1d1365af371..5060e6afdc9 100644
--- 
a/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
+++ 
b/features/broadcast/core/src/main/java/org/apache/shardingsphere/broadcast/rule/builder/BroadcastRuleBuilder.java
@@ -37,7 +37,7 @@ public final class BroadcastRuleBuilder implements 
DatabaseRuleBuilder<Broadcast
     @Override
     public BroadcastRule build(final BroadcastRuleConfiguration config, final 
String databaseName, final DatabaseType protocolType,
                                final Map<String, DataSource> dataSources, 
final Collection<ShardingSphereRule> builtRules, final InstanceContext 
instanceContext) {
-        return new BroadcastRule(config, databaseName, dataSources);
+        return new BroadcastRule(config, databaseName, dataSources, 
builtRules);
     }
     
     @Override
diff --git 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
index e5203efb2bf..614258e8a09 100644
--- 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
+++ 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastDatabaseBroadcastRoutingEngineTest.java
@@ -35,7 +35,7 @@ class BroadcastDatabaseBroadcastRoutingEngineTest {
     @Test
     void assertRoute() {
         BroadcastRule broadcastRule = mock(BroadcastRule.class);
-        
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
 "ds_1"));
+        
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", 
"ds_1"));
         BroadcastDatabaseBroadcastRoutingEngine engine = new 
BroadcastDatabaseBroadcastRoutingEngine();
         RouteContext routeContext = engine.route(new RouteContext(), 
broadcastRule);
         assertThat(routeContext.getRouteUnits().size(), is(2));
diff --git 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
index 081678311e1..b354b6a4b21 100644
--- 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
+++ 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastInstanceBroadcastRoutingEngineTest.java
@@ -39,7 +39,7 @@ class BroadcastInstanceBroadcastRoutingEngineTest {
         
when(resourceMetaData.getAllInstanceDataSourceNames()).thenReturn(Collections.singleton("ds_0"));
         BroadcastInstanceBroadcastRoutingEngine engine = new 
BroadcastInstanceBroadcastRoutingEngine(resourceMetaData);
         BroadcastRule broadcastRule = mock(BroadcastRule.class);
-        
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
 "ds_1"));
+        
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", 
"ds_1"));
         RouteContext routeContext = engine.route(new RouteContext(), 
broadcastRule);
         assertThat(routeContext.getRouteUnits().size(), is(1));
         
assertDataSourceRouteMapper(routeContext.getRouteUnits().iterator().next(), 
"ds_0");
diff --git 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
index 1e1de22d899..89399b1a57f 100644
--- 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
+++ 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/broadcast/BroadcastTableBroadcastRoutingEngineTest.java
@@ -41,7 +41,7 @@ class BroadcastTableBroadcastRoutingEngineTest {
         Collection<String> broadcastRuleTableNames = 
Collections.singleton("t_address");
         BroadcastTableBroadcastRoutingEngine engine = new 
BroadcastTableBroadcastRoutingEngine(broadcastRuleTableNames);
         BroadcastRule broadcastRule = mock(BroadcastRule.class);
-        
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
 "ds_1"));
+        
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", 
"ds_1"));
         
when(broadcastRule.getBroadcastRuleTableNames(any())).thenReturn(Collections.singleton("t_address"));
         RouteContext routeContext = engine.route(new RouteContext(), 
broadcastRule);
         assertThat(routeContext.getRouteUnits().size(), is(2));
@@ -55,7 +55,7 @@ class BroadcastTableBroadcastRoutingEngineTest {
         Collection<String> broadcastRuleTableNames = 
Collections.singleton("t_address");
         BroadcastTableBroadcastRoutingEngine engine = new 
BroadcastTableBroadcastRoutingEngine(broadcastRuleTableNames);
         BroadcastRule broadcastRule = mock(BroadcastRule.class);
-        
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
 "ds_1"));
+        
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", 
"ds_1"));
         
when(broadcastRule.getBroadcastRuleTableNames(any())).thenReturn(Collections.emptyList());
         RouteContext routeContext = engine.route(new RouteContext(), 
broadcastRule);
         assertThat(routeContext.getRouteUnits().size(), is(2));
diff --git 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
index a8ea2ec2092..522a87f0c08 100644
--- 
a/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
+++ 
b/features/broadcast/core/src/test/java/org/apache/shardingsphere/broadcast/route/engine/type/unicast/BroadcastUnicastRoutingEngineTest.java
@@ -42,7 +42,7 @@ class BroadcastUnicastRoutingEngineTest {
     @BeforeEach
     void setUp() {
         broadcastRule = mock(BroadcastRule.class);
-        
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0",
 "ds_1"));
+        
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", 
"ds_1"));
     }
     
     @Test
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
index 2adc0108c3c..9df70f3f0c8 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/util/YamlDatabaseConfigurationImportExecutor.java
@@ -286,7 +286,8 @@ public final class YamlDatabaseConfigurationImportExecutor {
         allRuleConfigs.add(broadcastRuleConfig);
         database.getRuleMetaData().getRules().add(new 
BroadcastRule(broadcastRuleConfig, database.getName(),
                 
database.getResourceMetaData().getStorageUnits().entrySet().stream()
-                        .collect(Collectors.toMap(Entry::getKey, entry -> 
entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, 
LinkedHashMap::new))));
+                        .collect(Collectors.toMap(Entry::getKey, entry -> 
entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, 
LinkedHashMap::new)),
+                database.getRuleMetaData().getRules()));
     }
     
     private void addSingleRuleConfiguration(final SingleRuleConfiguration 
singleRuleConfig, final Collection<RuleConfiguration> allRuleConfigs, final 
ShardingSphereDatabase database) {

Reply via email to