This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 41eaef725c5 Refactor DataSourceCheckEngine (#29462)
41eaef725c5 is described below

commit 41eaef725c5705633c5abfc0d62e8356065188ba
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 00:26:42 2023 +0800

    Refactor DataSourceCheckEngine (#29462)
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
---
 .../core/checker/DataSourceCheckEngine.java        | 90 +++++++---------------
 .../datasource/DataSourceCheckEngineTest.java      | 17 ++--
 .../migration/preparer/MigrationJobPreparer.java   |  5 +-
 3 files changed, 42 insertions(+), 70 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index f9e48d9a224..e7759414322 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -31,7 +32,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
 
 /**
  * Data source check engine.
@@ -47,37 +47,13 @@ public final class DataSourceCheckEngine {
         sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
     }
     
-    /**
-     * Check source data source.
-     * 
-     * @param dataSource to be checked source data source
-     */
-    public void checkSourceDataSource(final DataSource dataSource) {
-        Collection<DataSource> dataSources = Collections.singleton(dataSource);
-        checkConnection(dataSources);
-        checkPrivilege(dataSources);
-        checkVariable(dataSources);
-    }
-    
-    /**
-     * Check target data source.
-     *
-     * @param dataSource to be checked target data source
-     * @param importerConfig importer configuration
-     */
-    public void checkTargetDataSource(final DataSource dataSource, final 
ImporterConfiguration importerConfig) {
-        Collection<DataSource> dataSources = Collections.singleton(dataSource);
-        checkConnection(dataSources);
-        checkTargetTable(dataSources, 
importerConfig.getTableAndSchemaNameMapper(), 
importerConfig.getLogicTableNames());
-    }
-    
     /**
      * Check data source connections.
      *
      * @param dataSources data sources
      * @throws PrepareJobWithInvalidConnectionException prepare job with 
invalid connection exception
      */
-    public void checkConnection(final Collection<? extends DataSource> 
dataSources) {
+    public void checkConnection(final Collection<DataSource> dataSources) {
         try {
             for (DataSource each : dataSources) {
                 each.getConnection().close();
@@ -88,22 +64,38 @@ public final class DataSourceCheckEngine {
     }
     
     /**
-     * Check table is empty.
+     * Check source data source.
+     * 
+     * @param dataSources to be checked source data source
+     */
+    public void checkSourceDataSource(final Collection<DataSource> 
dataSources) {
+        checkConnection(dataSources);
+        if (null == checker) {
+            return;
+        }
+        dataSources.forEach(checker::checkPrivilege);
+        dataSources.forEach(checker::checkVariable);
+    }
+    
+    /**
+     * Check target data sources.
      *
-     * @param dataSources data sources
-     * @param tableAndSchemaNameMapper mapping
-     * @param logicTableNames logic table names
-     * @throws PrepareJobWithInvalidConnectionException prepare job with 
invalid connection exception
+     * @param dataSources to be checked target data sources
+     * @param importerConfig importer configuration
      */
+    public void checkTargetDataSources(final Collection<DataSource> 
dataSources, final ImporterConfiguration importerConfig) {
+        checkConnection(dataSources);
+        checkTargetTable(dataSources, 
importerConfig.getTableAndSchemaNameMapper(), 
importerConfig.getLogicTableNames());
+    }
+    
     // TODO rename to common usage name
     // TODO Merge schemaName and tableNames
-    public void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final 
Collection<String> logicTableNames) {
+    private void checkTargetTable(final Collection<DataSource> dataSources, 
final TableAndSchemaNameMapper tableAndSchemaNameMapper, final 
Collection<String> logicTableNames) {
         try {
             for (DataSource each : dataSources) {
                 for (String tableName : logicTableNames) {
-                    if (!checkEmpty(each, 
tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) {
-                        throw new 
PrepareJobWithTargetTableNotEmptyException(tableName);
-                    }
+                    ShardingSpherePreconditions.checkState(checkEmpty(each, 
tableAndSchemaNameMapper.getSchemaName(tableName), tableName),
+                            () -> new 
PrepareJobWithTargetTableNotEmptyException(tableName));
                 }
             }
         } catch (final SQLException ex) {
@@ -120,32 +112,4 @@ public final class DataSourceCheckEngine {
             return !resultSet.next();
         }
     }
-    
-    /**
-     * Check user privileges.
-     *
-     * @param dataSources data sources
-     */
-    public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-        if (null == checker) {
-            return;
-        }
-        for (DataSource each : dataSources) {
-            checker.checkPrivilege(each);
-        }
-    }
-    
-    /**
-     * Check data source variables.
-     *
-     * @param dataSources data sources
-     */
-    public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
-        if (null == checker) {
-            return;
-        }
-        for (DataSource each : dataSources) {
-            checker.checkVariable(each);
-        }
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index f117cf933dd..16bf462303f 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
 import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -82,20 +84,25 @@ class DataSourceCheckEngineTest {
     }
     
     @Test
-    void assertCheckTargetTable() throws SQLException {
+    void assertCheckTargetDataSources() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableAndSchemaNameMapper(Collections.emptyMap()), 
Collections.singletonList("t_order"));
+        ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class);
+        when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
+        
when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order"));
+        dataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig);
     }
     
     @Test
-    void assertCheckTargetTableFailed() throws SQLException {
+    void assertCheckTargetDataSourcesFailed() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         when(resultSet.next()).thenReturn(true);
-        assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
-                () -> dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableAndSchemaNameMapper(Collections.emptyMap()), 
Collections.singletonList("t_order")));
+        ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class);
+        when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
+        
when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order"));
+        assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> 
dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index db5e6407798..9087424ba82 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -76,6 +76,7 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
@@ -101,7 +102,7 @@ public final class MigrationJobPreparer {
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
-        new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
+        new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
             PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
@@ -163,7 +164,7 @@ public final class MigrationJobPreparer {
         }
         if (null == jobItemContext.getInitProgress()) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            new 
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource,
 jobItemContext.getTaskConfig().getImporterConfig());
+            new 
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
 jobItemContext.getTaskConfig().getImporterConfig());
         }
     }
     

Reply via email to