This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e131dac4696 Refactor StandardPipelineDataSourceConfiguration (#32473)
e131dac4696 is described below
commit e131dac469694b81679c076d2d324d69556d3f5d
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 12 19:40:44 2024 +0800
Refactor StandardPipelineDataSourceConfiguration (#32473)
* Refactor StandardPipelineDataSourceConfiguration
* Refactor StandardPipelineDataSourceConfiguration
* Refactor StandardPipelineDataSourceConfiguration
---
.../StandardPipelineDataSourceConfiguration.java | 26 +++++-----------------
...tandardPipelineDataSourceConfigurationTest.java | 8 -------
.../mysql/ingest/MySQLIncrementalDumperTest.java | 7 +++++-
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 8 ++++++-
.../wal/PostgreSQLLogicalReplicationTest.java | 9 ++++++--
.../ingest/wal/WALEventConverterTest.java | 7 +++++-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 9 ++++++--
.../core/importer/PipelineDataSourceSinkTest.java | 9 ++++++--
8 files changed, 46 insertions(+), 37 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
index 3b354bca614..22b83faddb8 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -50,8 +49,6 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
@Getter
private final String parameter;
- private final DataSourcePoolProperties dataSourcePoolProps;
-
@Getter
private final DatabaseType databaseType;
@@ -64,6 +61,8 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
@Getter
private final String password;
+ private final DataSourcePoolProperties dataSourcePoolProps;
+
@SuppressWarnings("unchecked")
public StandardPipelineDataSourceConfiguration(final String param) {
this(param, YamlEngine.unmarshal(param, Map.class));
@@ -82,30 +81,17 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
poolProps.put("url", poolProps.get("jdbcUrl"));
poolProps.remove("jdbcUrl");
}
- poolProps.remove(DATA_SOURCE_CLASS_NAME);
databaseType =
DatabaseTypeFactory.get(String.valueOf(poolProps.get("url")));
+ poolProps.remove(DATA_SOURCE_CLASS_NAME);
poolProps.put(DATA_SOURCE_CLASS_NAME,
"com.zaxxer.hikari.HikariDataSource");
- appendJdbcQueryProperties(databaseType, poolProps);
+ appendJdbcQueryProperties(poolProps);
+ url = String.valueOf(poolProps.get("url"));
username = String.valueOf(poolProps.get("username"));
password = String.valueOf(poolProps.get("password"));
- url = String.valueOf(poolProps.get("url"));
dataSourcePoolProps = new
YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(poolProps);
}
- public StandardPipelineDataSourceConfiguration(final String jdbcUrl, final
String username, final String password) {
- this(wrapParameter(jdbcUrl, username, password));
- }
-
- private static Map<String, Object> wrapParameter(final String jdbcUrl,
final String username, final String password) {
- Map<String, Object> result = new LinkedHashMap<>(3, 1F);
- // Reference ConnectionPropertySynonyms
- result.put("url", jdbcUrl);
- result.put("username", username);
- result.put("password", password);
- return result;
- }
-
- private void appendJdbcQueryProperties(final DatabaseType databaseType,
final Map<String, Object> poolProps) {
+ private void appendJdbcQueryProperties(final Map<String, Object>
poolProps) {
Optional<JdbcQueryPropertiesExtension> extension =
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class,
databaseType);
if (!extension.isPresent()) {
return;
diff --git
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
index 610f63f1588..a5ce0568e75 100644
---
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
+++
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java
@@ -37,14 +37,6 @@ class StandardPipelineDataSourceConfigurationTest {
private static final String PASSWORD = "password";
- @Test
- void assertCreateWithSimpleParameters() {
- StandardPipelineDataSourceConfiguration actual = new
StandardPipelineDataSourceConfiguration(JDBC_URL, USERNAME, PASSWORD);
- assertGetConfig(actual);
- actual = new
StandardPipelineDataSourceConfiguration(actual.getParameter());
- assertGetConfig(actual);
- }
-
@Test
void assertCreateWithYamlDataSourceConfiguration() {
Map<String, Object> yamlDataSourceConfig = new HashMap<>();
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 5db258901d3..79ce7c57607 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -55,6 +55,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -96,8 +97,12 @@ class MySQLIncrementalDumperTest {
}
private IncrementalDumperContext createDumperContext() {
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url", "jdbc:mock://127.0.0.1:3306/test");
+ poolProps.put("username", "root");
+ poolProps.put("password", "root");
DumperCommonContext commonContext = new DumperCommonContext(null,
- new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"),
+ new StandardPipelineDataSourceConfiguration(poolProps),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new
CaseInsensitiveIdentifier("t_order"), new
CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index f353cd1d16c..ec903e7e922 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -49,6 +49,8 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -106,8 +108,12 @@ class PostgreSQLWALDumperTest {
}
private IncrementalDumperContext createDumperContext(final String jdbcUrl,
final String username, final String password) {
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url", jdbcUrl);
+ poolProps.put("username", username);
+ poolProps.put("password", password);
DumperCommonContext commonContext = new DumperCommonContext(null,
- new StandardPipelineDataSourceConfiguration(jdbcUrl, username,
password),
+ new StandardPipelineDataSourceConfiguration(poolProps),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new
CaseInsensitiveIdentifier("t_order_0"), new
CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, "0101123456",
false);
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
index 96b4df144f1..f5cfc364f63 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
@@ -34,6 +34,8 @@ import
org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -66,8 +68,11 @@ class PostgreSQLLogicalReplicationTest {
@Test
void assertCreatePgConnectionSuccess() throws SQLException {
- Connection connection = logicalReplication.createConnection(
- new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url",
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL");
+ poolProps.put("username", "root");
+ poolProps.put("password", "root");
+ Connection connection = logicalReplication.createConnection(new
StandardPipelineDataSourceConfiguration(poolProps));
assertFalse(connection.isClosed());
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 8492f2756c7..02904e20a0a 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -54,6 +54,7 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -86,8 +87,12 @@ class WALEventConverterTest {
}
private IncrementalDumperContext mockDumperContext() {
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url",
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL");
+ poolProps.put("username", "root");
+ poolProps.put("password", "root");
DumperCommonContext commonContext = new DumperCommonContext(null,
- new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"),
+ new StandardPipelineDataSourceConfiguration(poolProps),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new
CaseInsensitiveIdentifier("t_order"), new
CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 9c94a3e907f..cb5bc9c27ca 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -62,7 +62,9 @@ import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
@@ -167,8 +169,11 @@ class CDCE2EIT {
}
private PipelineDataSourceWrapper createStandardDataSource(final
PipelineContainerComposer containerComposer, final String storageUnitName) {
- return new PipelineDataSourceWrapper(new
StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName,
false),
- containerComposer.getUsername(),
containerComposer.getPassword()));
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url",
containerComposer.getActualJdbcUrlTemplate(storageUnitName, false));
+ poolProps.put("username", containerComposer.getUsername());
+ poolProps.put("password", containerComposer.getPassword());
+ return new PipelineDataSourceWrapper(new
StandardPipelineDataSourceConfiguration(poolProps));
}
private CDCClient buildCDCClientAndStart(final PipelineDataSourceWrapper
dataSource, final PipelineContainerComposer containerComposer) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 535cbec67de..7fa97e30e6c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -46,6 +46,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -65,8 +66,7 @@ class PipelineDataSourceSinkTest {
private static final String TABLE_NAME = "test_table";
- private final PipelineDataSourceConfiguration dataSourceConfig = new
StandardPipelineDataSourceConfiguration(
-
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
"root", "root");
+ private PipelineDataSourceConfiguration dataSourceConfig;
@Mock
private PipelineChannel channel;
@@ -81,6 +81,11 @@ class PipelineDataSourceSinkTest {
@BeforeEach
void setUp() throws SQLException {
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url",
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root");
+ poolProps.put("username", "root");
+ poolProps.put("password", "root");
+ dataSourceConfig = new
StandardPipelineDataSourceConfiguration(poolProps);
PipelineSink pipelineSink = new
PipelineDataSourceSink(mockImporterConfiguration(),
mockPipelineDataSourceManager());
importer = new SingleChannelConsumerImporter(channel, 100, 1000L,
pipelineSink, new FixtureTransmissionJobItemContext());
}