This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new eebef9a Decouple StandardJdbcUrlParser and MySQLIncrementalDumper
(#14658)
eebef9a is described below
commit eebef9a10a3799d9ba6f2541de86dc8ee63a2f8f
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Jan 10 15:15:44 2022 +0800
Decouple StandardJdbcUrlParser and MySQLIncrementalDumper (#14658)
---
.../mysql/ingest/MySQLIncrementalDumper.java | 14 ++++++-------
.../mysql/ingest/MySQLIncrementalDumperTest.java | 23 +++++++++-------------
2 files changed, 16 insertions(+), 21 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 4696b3c..6c90ac1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -47,8 +47,8 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQ
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQLColumnMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrl;
-import
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
+import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
import java.io.Serializable;
@@ -97,15 +97,15 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
private void dump() {
HikariConfig hikariConfig = ((StandardPipelineDataSourceConfiguration)
dumperConfig.getDataSourceConfig()).getHikariConfig();
log.info("incremental dump, jdbcUrl={}", hikariConfig.getJdbcUrl());
- JdbcUrl jdbcUrl = new
StandardJdbcUrlParser().parse(hikariConfig.getJdbcUrl());
- MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(),
jdbcUrl.getHostname(), jdbcUrl.getPort(), hikariConfig.getUsername(),
hikariConfig.getPassword()));
+ DataSourceMetaData metaData =
DatabaseTypeRegistry.getActualDatabaseType("MySQL").getDataSourceMetaData(hikariConfig.getJdbcUrl(),
null);
+ MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(),
metaData.getHostname(), metaData.getPort(), hikariConfig.getUsername(),
hikariConfig.getPassword()));
client.connect();
client.subscribe(binlogPosition.getFilename(),
binlogPosition.getPosition());
int eventCount = 0;
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
if (null != event) {
- handleEvent(jdbcUrl, event);
+ handleEvent(metaData.getCatalog(), event);
eventCount++;
}
}
@@ -113,8 +113,8 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
pushRecord(new FinishedRecord(new PlaceholderPosition()));
}
- private void handleEvent(final JdbcUrl jdbcUrl, final AbstractBinlogEvent
event) {
- if (event instanceof PlaceholderEvent || filter(jdbcUrl.getDatabase(),
(AbstractRowsEvent) event)) {
+ private void handleEvent(final String catalog, final AbstractBinlogEvent
event) {
+ if (event instanceof PlaceholderEvent || filter(catalog,
(AbstractRowsEvent) event)) {
createPlaceholderRecord(event);
return;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 27eaf1e..841488f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -34,8 +34,6 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteR
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
-import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrl;
-import
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
import org.junit.Before;
import org.junit.Test;
@@ -54,8 +52,6 @@ import static org.junit.Assert.assertTrue;
public final class MySQLIncrementalDumperTest {
- private static final String URL =
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
-
private MySQLIncrementalDumper incrementalDumper;
private MemoryPipelineChannel channel;
@@ -64,15 +60,14 @@ public final class MySQLIncrementalDumperTest {
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
- channel = new MemoryPipelineChannel(records -> {
- });
+ channel = new MemoryPipelineChannel(records -> { });
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new
BinlogPosition("binlog-000001", 4L));
incrementalDumper.setChannel(channel);
}
private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration result = new DumperConfiguration();
- result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(URL, "root", "root"));
+ result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
Map<String, String> tableNameMap = new HashedMap<>(1);
tableNameMap.put("t_order", "t_order");
result.setTableNameMap(tableNameMap);
@@ -98,7 +93,7 @@ public final class MySQLIncrementalDumperTest {
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setAfterRows(rows);
- invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+ invokeHandleEvent(rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
@@ -116,7 +111,7 @@ public final class MySQLIncrementalDumperTest {
afterRows.add(new String[]{"1", "order_new"});
rowsEvent.setBeforeRows(beforeRows);
rowsEvent.setAfterRows(afterRows);
- invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+ invokeHandleEvent(rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
@@ -131,7 +126,7 @@ public final class MySQLIncrementalDumperTest {
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setBeforeRows(rows);
- invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+ invokeHandleEvent(rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
@@ -140,7 +135,7 @@ public final class MySQLIncrementalDumperTest {
@Test
public void assertPlaceholderEvent() {
- invokeHandleEvent(new
StandardJdbcUrlParser().parse("jdbc:mysql://127.0.0.1:3306/test_db"), new
PlaceholderEvent());
+ invokeHandleEvent(new PlaceholderEvent());
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof PlaceholderRecord);
@@ -150,14 +145,14 @@ public final class MySQLIncrementalDumperTest {
public void assertRowsEventFiltered() {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setSchemaName("unknown_schema");
- invokeHandleEvent(new StandardJdbcUrlParser().parse(URL), rowsEvent);
+ invokeHandleEvent(rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof PlaceholderRecord);
}
@SneakyThrows({NoSuchMethodException.class,
ReflectiveOperationException.class})
- private void invokeHandleEvent(final JdbcUrl jdbcUrl, final
AbstractBinlogEvent event) {
- ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new
Class[]{JdbcUrl.class, AbstractBinlogEvent.class}, new Object[]{jdbcUrl,
event});
+ private void invokeHandleEvent(final AbstractBinlogEvent event) {
+ ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new
Class[]{String.class, AbstractBinlogEvent.class}, new Object[]{"", event});
}
}