This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e131dac4696 Refactor StandardPipelineDataSourceConfiguration (#32473)
e131dac4696 is described below

commit e131dac469694b81679c076d2d324d69556d3f5d
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 12 19:40:44 2024 +0800

    Refactor StandardPipelineDataSourceConfiguration (#32473)
    
    * Refactor StandardPipelineDataSourceConfiguration
    
    * Refactor StandardPipelineDataSourceConfiguration
    
    * Refactor StandardPipelineDataSourceConfiguration
---
 .../StandardPipelineDataSourceConfiguration.java   | 26 +++++-----------------
 ...tandardPipelineDataSourceConfigurationTest.java |  8 -------
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  7 +++++-
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  8 ++++++-
 .../wal/PostgreSQLLogicalReplicationTest.java      |  9 ++++++--
 .../ingest/wal/WALEventConverterTest.java          |  7 +++++-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  9 ++++++--
 .../core/importer/PipelineDataSourceSinkTest.java  |  9 ++++++--
 8 files changed, 46 insertions(+), 37 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
index 3b354bca614..22b83faddb8 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -50,8 +49,6 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
     @Getter
     private final String parameter;
     
-    private final DataSourcePoolProperties dataSourcePoolProps;
-    
     @Getter
     private final DatabaseType databaseType;
     
@@ -64,6 +61,8 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
     @Getter
     private final String password;
     
+    private final DataSourcePoolProperties dataSourcePoolProps;
+    
     @SuppressWarnings("unchecked")
     public StandardPipelineDataSourceConfiguration(final String param) {
         this(param, YamlEngine.unmarshal(param, Map.class));
@@ -82,30 +81,17 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
             poolProps.put("url", poolProps.get("jdbcUrl"));
             poolProps.remove("jdbcUrl");
         }
-        poolProps.remove(DATA_SOURCE_CLASS_NAME);
         databaseType = 
DatabaseTypeFactory.get(String.valueOf(poolProps.get("url")));
+        poolProps.remove(DATA_SOURCE_CLASS_NAME);
         poolProps.put(DATA_SOURCE_CLASS_NAME, 
"com.zaxxer.hikari.HikariDataSource");
-        appendJdbcQueryProperties(databaseType, poolProps);
+        appendJdbcQueryProperties(poolProps);
+        url = String.valueOf(poolProps.get("url"));
         username = String.valueOf(poolProps.get("username"));
         password = String.valueOf(poolProps.get("password"));
-        url = String.valueOf(poolProps.get("url"));
         dataSourcePoolProps = new 
YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(poolProps);
     }
     
-    public StandardPipelineDataSourceConfiguration(final String jdbcUrl, final 
String username, final String password) {
-        this(wrapParameter(jdbcUrl, username, password));
-    }
-    
-    private static Map<String, Object> wrapParameter(final String jdbcUrl, 
final String username, final String password) {
-        Map<String, Object> result = new LinkedHashMap<>(3, 1F);
-        // Reference ConnectionPropertySynonyms
-        result.put("url", jdbcUrl);
-        result.put("username", username);
-        result.put("password", password);
-        return result;
-    }
-    
-    private void appendJdbcQueryProperties(final DatabaseType databaseType, 
final Map<String, Object> poolProps) {
+    private void appendJdbcQueryProperties(final Map<String, Object> 
poolProps) {
         Optional<JdbcQueryPropertiesExtension> extension = 
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, 
databaseType);
         if (!extension.isPresent()) {
             return;
diff --git 
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
 
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
index 610f63f1588..a5ce0568e75 100644
--- 
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
+++ 
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
@@ -37,14 +37,6 @@ class StandardPipelineDataSourceConfigurationTest {
     
     private static final String PASSWORD = "password";
     
-    @Test
-    void assertCreateWithSimpleParameters() {
-        StandardPipelineDataSourceConfiguration actual = new 
StandardPipelineDataSourceConfiguration(JDBC_URL, USERNAME, PASSWORD);
-        assertGetConfig(actual);
-        actual = new 
StandardPipelineDataSourceConfiguration(actual.getParameter());
-        assertGetConfig(actual);
-    }
-    
     @Test
     void assertCreateWithYamlDataSourceConfiguration() {
         Map<String, Object> yamlDataSourceConfig = new HashMap<>();
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 5db258901d3..79ce7c57607 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -55,6 +55,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -96,8 +97,12 @@ class MySQLIncrementalDumperTest {
     }
     
     private IncrementalDumperContext createDumperContext() {
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", "jdbc:mock://127.0.0.1:3306/test");
+        poolProps.put("username", "root");
+        poolProps.put("password", "root");
         DumperCommonContext commonContext = new DumperCommonContext(null,
-                new 
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", 
"root", "root"),
+                new StandardPipelineDataSourceConfiguration(poolProps),
                 new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
CaseInsensitiveIdentifier("t_order"), new 
CaseInsensitiveIdentifier("t_order"))),
                 new TableAndSchemaNameMapper(Collections.emptyMap()));
         return new IncrementalDumperContext(commonContext, null, false);
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index f353cd1d16c..ec903e7e922 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -49,6 +49,8 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -106,8 +108,12 @@ class PostgreSQLWALDumperTest {
     }
     
     private IncrementalDumperContext createDumperContext(final String jdbcUrl, 
final String username, final String password) {
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", jdbcUrl);
+        poolProps.put("username", username);
+        poolProps.put("password", password);
         DumperCommonContext commonContext = new DumperCommonContext(null,
-                new StandardPipelineDataSourceConfiguration(jdbcUrl, username, 
password),
+                new StandardPipelineDataSourceConfiguration(poolProps),
                 new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
CaseInsensitiveIdentifier("t_order_0"), new 
CaseInsensitiveIdentifier("t_order"))),
                 new TableAndSchemaNameMapper(Collections.emptyMap()));
         return new IncrementalDumperContext(commonContext, "0101123456", 
false);
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
index 96b4df144f1..f5cfc364f63 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
@@ -34,6 +34,8 @@ import 
org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -66,8 +68,11 @@ class PostgreSQLLogicalReplicationTest {
     
     @Test
     void assertCreatePgConnectionSuccess() throws SQLException {
-        Connection connection = logicalReplication.createConnection(
-                new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"));
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", 
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL");
+        poolProps.put("username", "root");
+        poolProps.put("password", "root");
+        Connection connection = logicalReplication.createConnection(new 
StandardPipelineDataSourceConfiguration(poolProps));
         assertFalse(connection.isClosed());
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 8492f2756c7..02904e20a0a 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -54,6 +54,7 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -86,8 +87,12 @@ class WALEventConverterTest {
     }
     
     private IncrementalDumperContext mockDumperContext() {
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", 
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL");
+        poolProps.put("username", "root");
+        poolProps.put("password", "root");
         DumperCommonContext commonContext = new DumperCommonContext(null,
-                new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"),
+                new StandardPipelineDataSourceConfiguration(poolProps),
                 new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
CaseInsensitiveIdentifier("t_order"), new 
CaseInsensitiveIdentifier("t_order"))),
                 new TableAndSchemaNameMapper(Collections.emptyMap()));
         return new IncrementalDumperContext(commonContext, null, false);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 9c94a3e907f..cb5bc9c27ca 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -62,7 +62,9 @@ import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
@@ -167,8 +169,11 @@ class CDCE2EIT {
     }
     
     private PipelineDataSourceWrapper createStandardDataSource(final 
PipelineContainerComposer containerComposer, final String storageUnitName) {
-        return new PipelineDataSourceWrapper(new 
StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName,
 false),
-                containerComposer.getUsername(), 
containerComposer.getPassword()));
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", 
containerComposer.getActualJdbcUrlTemplate(storageUnitName, false));
+        poolProps.put("username", containerComposer.getUsername());
+        poolProps.put("password", containerComposer.getPassword());
+        return new PipelineDataSourceWrapper(new 
StandardPipelineDataSourceConfiguration(poolProps));
     }
     
     private CDCClient buildCDCClientAndStart(final PipelineDataSourceWrapper 
dataSource, final PipelineContainerComposer containerComposer) {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 535cbec67de..7fa97e30e6c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -46,6 +46,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -65,8 +66,7 @@ class PipelineDataSourceSinkTest {
     
     private static final String TABLE_NAME = "test_table";
     
-    private final PipelineDataSourceConfiguration dataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
-            
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
 "root", "root");
+    private PipelineDataSourceConfiguration dataSourceConfig;
     
     @Mock
     private PipelineChannel channel;
@@ -81,6 +81,11 @@ class PipelineDataSourceSinkTest {
     
     @BeforeEach
     void setUp() throws SQLException {
+        Map<String, Object> poolProps = new HashMap<>(3, 1F);
+        poolProps.put("url", 
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root");
+        poolProps.put("username", "root");
+        poolProps.put("password", "root");
+        dataSourceConfig = new 
StandardPipelineDataSourceConfiguration(poolProps);
         PipelineSink pipelineSink = new 
PipelineDataSourceSink(mockImporterConfiguration(), 
mockPipelineDataSourceManager());
         importer = new SingleChannelConsumerImporter(channel, 100, 1000L, 
pipelineSink, new FixtureTransmissionJobItemContext());
     }

Reply via email to