This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ca96a7c66 Refactor ShardingSpherePipelineDataSourceCreator (#32475)
65ca96a7c66 is described below

commit 65ca96a7c66e49d242edf5249800a497fef1cd86
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 12 21:35:37 2024 +0800

    Refactor ShardingSpherePipelineDataSourceCreator (#32475)
    
    * Refactor ShardingSpherePipelineDataSourceCreator
    
    * Refactor ShardingSpherePipelineDataSourceCreator
    
    * Refactor ShardingSpherePipelineDataSourceCreator
    
    * Refactor ShardingSpherePipelineDataSourceCreator
    
    * Refactor ShardingSpherePipelineDataSourceCreator
---
 ...rdingSpherePipelineDataSourceConfiguration.java |  4 +-
 .../ShardingSpherePipelineDataSourceCreator.java   | 71 +++++++++++-----------
 2 files changed, 38 insertions(+), 37 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/ShardingSpherePipelineDataSourceConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/ShardingSpherePipelineDataSourceConfiguration.java
index 02cf90980c6..6a38ca1103a 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/ShardingSpherePipelineDataSourceConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/ShardingSpherePipelineDataSourceConfiguration.java
@@ -65,7 +65,7 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
         parameter = YamlEngine.marshal(rootConfig);
         Map<String, Object> props = 
rootConfig.getDataSources().values().iterator().next();
         databaseType = DatabaseTypeFactory.get(getJdbcUrl(props));
-        appendJdbcQueryProperties(databaseType);
+        appendJdbcQueryProperties();
         adjustDataSourcePoolProperties(rootConfig.getDataSources());
     }
     
@@ -88,7 +88,7 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
         return result.toString();
     }
     
-    private void appendJdbcQueryProperties(final DatabaseType databaseType) {
+    private void appendJdbcQueryProperties() {
         Optional<JdbcQueryPropertiesExtension> extension = 
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, 
databaseType);
         if (!extension.isPresent()) {
             return;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index 5b86d86cdc7..9b72133cd61 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -46,7 +46,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -58,39 +57,43 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
     
     @Override
     public DataSource create(final Object dataSourceConfig) throws 
SQLException {
-        YamlRootConfiguration rootConfig = 
YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), 
YamlRootConfiguration.class);
-        removeAuthorityRule(rootConfig);
-        updateSingleRuleConfiguration(rootConfig);
-        disableSystemSchemaMetadata(rootConfig);
-        enableStreamingQuery(rootConfig);
-        Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = 
ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
-        if (yamlShardingRuleConfig.isPresent()) {
-            enableRangeQueryForInline(yamlShardingRuleConfig.get());
-            removeAuditStrategy(yamlShardingRuleConfig.get());
-        }
-        rootConfig.setMode(createStandaloneModeConfiguration());
-        return createDataSourceWithoutCache(rootConfig);
+        YamlRootConfiguration yamlRootConfig = 
YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), 
YamlRootConfiguration.class);
+        removeAuthorityRuleConfiguration(yamlRootConfig);
+        updateSingleRuleConfiguration(yamlRootConfig);
+        disableSystemSchemaMetadata(yamlRootConfig);
+        enableStreamingQuery(yamlRootConfig);
+        updateShardingRuleConfiguration(yamlRootConfig);
+        yamlRootConfig.setMode(createStandaloneModeConfiguration());
+        return createShardingSphereDataSource(yamlRootConfig);
     }
     
-    private void removeAuthorityRule(final YamlRootConfiguration rootConfig) {
-        
rootConfig.getRules().stream().filter(YamlAuthorityRuleConfiguration.class::isInstance).findFirst().map(each
 -> rootConfig.getRules().remove(each));
+    private void removeAuthorityRuleConfiguration(final YamlRootConfiguration 
yamlRootConfig) {
+        
yamlRootConfig.getRules().removeIf(YamlAuthorityRuleConfiguration.class::isInstance);
     }
     
-    private void updateSingleRuleConfiguration(final YamlRootConfiguration 
rootConfig) {
-        
rootConfig.getRules().removeIf(YamlSingleRuleConfiguration.class::isInstance);
+    private void updateSingleRuleConfiguration(final YamlRootConfiguration 
yamlRootConfig) {
+        
yamlRootConfig.getRules().removeIf(YamlSingleRuleConfiguration.class::isInstance);
         YamlSingleRuleConfiguration singleRuleConfig = new 
YamlSingleRuleConfiguration();
         
singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
-        rootConfig.getRules().add(singleRuleConfig);
+        yamlRootConfig.getRules().add(singleRuleConfig);
     }
     
-    private void disableSystemSchemaMetadata(final YamlRootConfiguration 
rootConfig) {
-        
rootConfig.getProps().put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ENABLED.getKey(),
 String.valueOf(Boolean.FALSE));
+    private void disableSystemSchemaMetadata(final YamlRootConfiguration 
yamlRootConfig) {
+        
yamlRootConfig.getProps().put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ENABLED.getKey(),
 String.valueOf(Boolean.FALSE));
     }
     
     // TODO Another way is improving ExecuteQueryCallback.executeSQL to enable 
streaming query, then remove it
-    private void enableStreamingQuery(final YamlRootConfiguration rootConfig) {
+    private void enableStreamingQuery(final YamlRootConfiguration 
yamlRootConfig) {
         // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, 
make sure streaming query work.
-        
rootConfig.getProps().put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(),
 100000);
+        
yamlRootConfig.getProps().put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(),
 100000);
+    }
+    
+    private void updateShardingRuleConfiguration(final YamlRootConfiguration 
yamlRootConfig) {
+        Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = 
ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(yamlRootConfig.getRules());
+        if (yamlShardingRuleConfig.isPresent()) {
+            enableRangeQueryForInline(yamlShardingRuleConfig.get());
+            removeAuditStrategy(yamlShardingRuleConfig.get());
+        }
     }
     
     private void enableRangeQueryForInline(final YamlShardingRuleConfiguration 
yamlShardingRuleConfig) {
@@ -115,20 +118,18 @@ public final class 
ShardingSpherePipelineDataSourceCreator implements PipelineDa
     private YamlModeConfiguration createStandaloneModeConfiguration() {
         YamlModeConfiguration result = new YamlModeConfiguration();
         result.setType("Standalone");
-        YamlPersistRepositoryConfiguration repository = new 
YamlPersistRepositoryConfiguration();
-        result.setRepository(repository);
-        repository.setType("JDBC");
-        Properties props = new Properties();
-        repository.setProps(props);
-        props.setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
+        YamlPersistRepositoryConfiguration yamlRepositoryConfig = new 
YamlPersistRepositoryConfiguration();
+        yamlRepositoryConfig.setType("JDBC");
+        
yamlRepositoryConfig.getProps().setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
                 
String.format("jdbc:h2:mem:pipeline_db_%d;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL",
 STANDALONE_DATABASE_ID.getAndIncrement()));
+        result.setRepository(yamlRepositoryConfig);
         return result;
     }
     
-    private DataSource createDataSourceWithoutCache(final 
YamlRootConfiguration rootConfig) throws SQLException {
-        Map<String, DataSource> dataSourceMap = new 
YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(),
 false);
+    private DataSource createShardingSphereDataSource(final 
YamlRootConfiguration yamlRootConfig) throws SQLException {
+        Map<String, DataSource> dataSourceMap = new 
YamlDataSourceConfigurationSwapper().swapToDataSources(yamlRootConfig.getDataSources(),
 false);
         try {
-            return createDataSource(dataSourceMap, rootConfig);
+            return createShardingSphereDataSource(dataSourceMap, 
yamlRootConfig);
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -137,10 +138,10 @@ public final class 
ShardingSpherePipelineDataSourceCreator implements PipelineDa
         }
     }
     
-    private DataSource createDataSource(final Map<String, DataSource> 
dataSourceMap, final YamlRootConfiguration rootConfig) throws SQLException {
-        ModeConfiguration modeConfig = null == rootConfig.getMode() ? null : 
new YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());
-        Collection<RuleConfiguration> ruleConfigs = new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
-        return 
ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), 
modeConfig, dataSourceMap, ruleConfigs, rootConfig.getProps());
+    private DataSource createShardingSphereDataSource(final Map<String, 
DataSource> dataSourceMap, final YamlRootConfiguration yamlRootConfig) throws 
SQLException {
+        ModeConfiguration modeConfig = new 
YamlModeConfigurationSwapper().swapToObject(yamlRootConfig.getMode());
+        Collection<RuleConfiguration> ruleConfigs = new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfig.getRules());
+        return 
ShardingSphereDataSourceFactory.createDataSource(yamlRootConfig.getDatabaseName(),
 modeConfig, dataSourceMap, ruleConfigs, yamlRootConfig.getProps());
     }
     
     @Override

Reply via email to