This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new eebef9a  Decouple StandardJdbcUrlParser and MySQLIncrementalDumper 
(#14658)
eebef9a is described below

commit eebef9a10a3799d9ba6f2541de86dc8ee63a2f8f
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Jan 10 15:15:44 2022 +0800

    Decouple StandardJdbcUrlParser and MySQLIncrementalDumper (#14658)
---
 .../mysql/ingest/MySQLIncrementalDumper.java       | 14 ++++++-------
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 23 +++++++++-------------
 2 files changed, 16 insertions(+), 21 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 4696b3c..6c90ac1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -47,8 +47,8 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQ
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQLColumnMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrl;
-import 
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
+import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
 
 import java.io.Serializable;
@@ -97,15 +97,15 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     private void dump() {
         HikariConfig hikariConfig = ((StandardPipelineDataSourceConfiguration) 
dumperConfig.getDataSourceConfig()).getHikariConfig();
         log.info("incremental dump, jdbcUrl={}", hikariConfig.getJdbcUrl());
-        JdbcUrl jdbcUrl = new 
StandardJdbcUrlParser().parse(hikariConfig.getJdbcUrl());
-        MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), 
jdbcUrl.getHostname(), jdbcUrl.getPort(), hikariConfig.getUsername(), 
hikariConfig.getPassword()));
+        DataSourceMetaData metaData = 
DatabaseTypeRegistry.getActualDatabaseType("MySQL").getDataSourceMetaData(hikariConfig.getJdbcUrl(),
 null);
+        MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), 
metaData.getHostname(), metaData.getPort(), hikariConfig.getUsername(), 
hikariConfig.getPassword()));
         client.connect();
         client.subscribe(binlogPosition.getFilename(), 
binlogPosition.getPosition());
         int eventCount = 0;
         while (isRunning()) {
             AbstractBinlogEvent event = client.poll();
             if (null != event) {
-                handleEvent(jdbcUrl, event);
+                handleEvent(metaData.getCatalog(), event);
                 eventCount++;
             }
         }
@@ -113,8 +113,8 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
         pushRecord(new FinishedRecord(new PlaceholderPosition()));
     }
     
-    private void handleEvent(final JdbcUrl jdbcUrl, final AbstractBinlogEvent 
event) {
-        if (event instanceof PlaceholderEvent || filter(jdbcUrl.getDatabase(), 
(AbstractRowsEvent) event)) {
+    private void handleEvent(final String catalog, final AbstractBinlogEvent 
event) {
+        if (event instanceof PlaceholderEvent || filter(catalog, 
(AbstractRowsEvent) event)) {
             createPlaceholderRecord(event);
             return;
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 27eaf1e..841488f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -34,8 +34,6 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteR
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
-import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrl;
-import 
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -54,8 +52,6 @@ import static org.junit.Assert.assertTrue;
 
 public final class MySQLIncrementalDumperTest {
     
-    private static final String URL = 
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
-    
     private MySQLIncrementalDumper incrementalDumper;
     
     private MemoryPipelineChannel channel;
@@ -64,15 +60,14 @@ public final class MySQLIncrementalDumperTest {
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
-        channel = new MemoryPipelineChannel(records -> {
-        });
+        channel = new MemoryPipelineChannel(records -> { });
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new 
BinlogPosition("binlog-000001", 4L));
         incrementalDumper.setChannel(channel);
     }
     
     private DumperConfiguration mockDumperConfiguration() {
         DumperConfiguration result = new DumperConfiguration();
-        result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(URL, "root", "root"));
+        result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
         Map<String, String> tableNameMap = new HashedMap<>(1);
         tableNameMap.put("t_order", "t_order");
         result.setTableNameMap(tableNameMap);
@@ -98,7 +93,7 @@ public final class MySQLIncrementalDumperTest {
         List<Serializable[]> rows = new ArrayList<>(1);
         rows.add(new String[]{"1", "order"});
         rowsEvent.setAfterRows(rows);
-        invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+        invokeHandleEvent(rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof DataRecord);
@@ -116,7 +111,7 @@ public final class MySQLIncrementalDumperTest {
         afterRows.add(new String[]{"1", "order_new"});
         rowsEvent.setBeforeRows(beforeRows);
         rowsEvent.setAfterRows(afterRows);
-        invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+        invokeHandleEvent(rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof DataRecord);
@@ -131,7 +126,7 @@ public final class MySQLIncrementalDumperTest {
         List<Serializable[]> rows = new ArrayList<>(1);
         rows.add(new String[]{"1", "order"});
         rowsEvent.setBeforeRows(rows);
-        invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+        invokeHandleEvent(rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof DataRecord);
@@ -140,7 +135,7 @@ public final class MySQLIncrementalDumperTest {
     
     @Test
     public void assertPlaceholderEvent() {
-        invokeHandleEvent(new 
StandardJdbcUrlParser().parse("jdbc:mysql://127.0.0.1:3306/test_db"), new 
PlaceholderEvent());
+        invokeHandleEvent(new PlaceholderEvent());
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof PlaceholderRecord);
@@ -150,14 +145,14 @@ public final class MySQLIncrementalDumperTest {
     public void assertRowsEventFiltered() {
         WriteRowsEvent rowsEvent = new WriteRowsEvent();
         rowsEvent.setSchemaName("unknown_schema");
-        invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+        invokeHandleEvent(rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof PlaceholderRecord);
     }
     
     @SneakyThrows({NoSuchMethodException.class, 
ReflectiveOperationException.class})
-    private void invokeHandleEvent(final JdbcUrl jdbcUrl, final 
AbstractBinlogEvent event) {
-        ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new 
Class[]{JdbcUrl.class, AbstractBinlogEvent.class}, new Object[]{jdbcUrl, 
event});
+    private void invokeHandleEvent(final AbstractBinlogEvent event) {
+        ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new 
Class[]{String.class, AbstractBinlogEvent.class}, new Object[]{"", event});
     }
 }

Reply via email to