This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c07a25c1d17 Refactor AbstractIncrementalDumper (#21329)
c07a25c1d17 is described below

commit c07a25c1d1796c1cc2e57d5bdd155d9cf4fcb107
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Oct 3 20:49:29 2022 +0800

    Refactor AbstractIncrementalDumper (#21329)
    
    * Refactor AbstractIncrementalDumper
    
    * Refactor AbstractDataSourcePreparer
---
 .../prepare/datasource/AbstractDataSourcePreparer.java  |  5 +++--
 .../pipeline/mysql/ingest/MySQLIncrementalDumper.java   | 13 +++++--------
 .../mysql/ingest/MySQLIncrementalDumperTest.java        |  4 ++--
 .../pipeline/opengauss/ingest/OpenGaussWalDumper.java   | 17 +++++++++--------
 .../pipeline/postgresql/ingest/PostgreSQLWalDumper.java | 17 +++++++++--------
 5 files changed, 28 insertions(+), 28 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 32baf21e13d..13a134fcd9b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -52,7 +52,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
     private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple 
primary keys for table", "already exists"};
     
     @Override
-    public void prepareTargetSchemas(final PrepareTargetSchemasParameter 
parameter) throws SQLException {
+    public void prepareTargetSchemas(final PrepareTargetSchemasParameter 
parameter) {
         DatabaseType targetDatabaseType = parameter.getTargetDatabaseType();
         if (!targetDatabaseType.isSchemaAvailable()) {
             log.info("prepareTargetSchemas, target database does not support 
schema, ignore, targetDatabaseType={}", targetDatabaseType);
@@ -76,12 +76,13 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         log.info("prepareTargetSchemas, createdSchemaNames={}, 
defaultSchema={}", createdSchemaNames, defaultSchema);
     }
     
-    private void executeCreateSchema(final PipelineDataSourceManager 
dataSourceManager, final PipelineDataSourceConfiguration 
targetDataSourceConfig, final String sql) throws SQLException {
+    private void executeCreateSchema(final PipelineDataSourceManager 
dataSourceManager, final PipelineDataSourceConfiguration 
targetDataSourceConfig, final String sql) {
         log.info("prepareTargetSchemas, sql={}", sql);
         try (Connection connection = getCachedDataSource(dataSourceManager, 
targetDataSourceConfig).getConnection()) {
             try (Statement statement = connection.createStatement()) {
                 statement.execute(sql);
             }
+        } catch (final SQLException ignored) {
         }
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 4e896f3c504..9d3cee95a05 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -54,7 +54,6 @@ import java.io.Serializable;
 import java.security.SecureRandom;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Random;
 
 /**
  * MySQL incremental dumper.
@@ -62,13 +61,11 @@ import java.util.Random;
 @Slf4j
 public final class MySQLIncrementalDumper extends 
AbstractIncrementalDumper<BinlogPosition> {
     
-    private final BinlogPosition binlogPosition;
-    
     private final DumperConfiguration dumperConfig;
     
-    private final PipelineTableMetaDataLoader metaDataLoader;
+    private final BinlogPosition binlogPosition;
     
-    private final Random random = new SecureRandom();
+    private final PipelineTableMetaDataLoader metaDataLoader;
     
     private final PipelineChannel channel;
     
@@ -79,15 +76,15 @@ public final class MySQLIncrementalDumper extends 
AbstractIncrementalDumper<Binl
     public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, 
final IngestPosition<BinlogPosition> binlogPosition,
                                   final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, binlogPosition, channel, metaDataLoader);
-        this.binlogPosition = (BinlogPosition) binlogPosition;
-        this.dumperConfig = dumperConfig;
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() 
instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only 
support StandardPipelineDataSourceConfiguration");
+        this.dumperConfig = dumperConfig;
+        this.binlogPosition = (BinlogPosition) binlogPosition;
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
         YamlJdbcConfiguration jdbcConfig = 
((StandardPipelineDataSourceConfiguration) 
dumperConfig.getDataSourceConfig()).getJdbcConfig();
         log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
         DataSourceMetaData metaData = 
DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(),
 null);
-        client = new MySQLClient(new ConnectInfo(random.nextInt(), 
metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), 
jdbcConfig.getPassword()));
+        client = new MySQLClient(new ConnectInfo(new SecureRandom().nextInt(), 
metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), 
jdbcConfig.getPassword()));
         catalog = metaData.getCatalog();
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 63813bd2520..a2fadd35565 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -68,12 +68,12 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLIncrementalDumperTest {
     
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    
     private MySQLIncrementalDumper incrementalDumper;
     
     private MultiplexMemoryPipelineChannel channel;
     
-    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-    
     @Mock
     private PipelineTableMetaData pipelineTableMetaData;
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 55abb75930a..7660c49d3ca 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventCon
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.PGReplicationStream;
@@ -48,26 +49,26 @@ import java.sql.SQLException;
 @Slf4j
 public final class OpenGaussWalDumper extends 
AbstractIncrementalDumper<WalPosition> {
     
-    private final WalPosition walPosition;
-    
     private final DumperConfiguration dumperConfig;
     
-    private final OpenGaussLogicalReplication logicalReplication = new 
OpenGaussLogicalReplication();
+    private final WalPosition walPosition;
+    
+    private final PipelineChannel channel;
     
     private final WalEventConverter walEventConverter;
     
-    private final PipelineChannel channel;
+    private final OpenGaussLogicalReplication logicalReplication;
     
     public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final 
IngestPosition<WalPosition> position,
                               final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, position, channel, metaDataLoader);
-        walPosition = (WalPosition) position;
-        if 
(!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()))
 {
-            throw new UnsupportedSQLOperationException("PostgreSQLWalDumper 
only support PipelineDataSourceConfiguration");
-        }
+        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+                () -> new 
UnsupportedSQLOperationException("PostgreSQLWalDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
+        walPosition = (WalPosition) position;
         this.channel = channel;
         walEventConverter = new WalEventConverter(dumperConfig, 
metaDataLoader);
+        logicalReplication = new OpenGaussLogicalReplication();
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index d5f3ef5ae11..ab97b500973 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Post
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.PGReplicationStream;
@@ -49,26 +50,26 @@ import java.sql.SQLException;
 @Slf4j
 public final class PostgreSQLWalDumper extends 
AbstractIncrementalDumper<WalPosition> {
     
-    private final WalPosition walPosition;
-    
     private final DumperConfiguration dumperConfig;
     
-    private final LogicalReplication logicalReplication = new 
LogicalReplication();
+    private final WalPosition walPosition;
+    
+    private final PipelineChannel channel;
     
     private final WalEventConverter walEventConverter;
     
-    private final PipelineChannel channel;
+    private final LogicalReplication logicalReplication;
     
     public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final 
IngestPosition<WalPosition> position,
                                final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, position, channel, metaDataLoader);
-        walPosition = (WalPosition) position;
-        if 
(!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()))
 {
-            throw new UnsupportedSQLOperationException("PostgreSQLWalDumper 
only support PipelineDataSourceConfiguration");
-        }
+        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+                () -> new 
UnsupportedSQLOperationException("PostgreSQLWalDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
+        walPosition = (WalPosition) position;
         this.channel = channel;
         walEventConverter = new WalEventConverter(dumperConfig, 
metaDataLoader);
+        logicalReplication = new LogicalReplication();
     }
     
     @Override

Reply via email to