This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2838da8ff74 Remove PipelineJobPreparer.checkSourceDataSource() and 
checkTargetDataSource() (#29460)
2838da8ff74 is described below

commit 2838da8ff74bfd0127647f4b9880a206a7b0bc73
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 19 23:25:44 2023 +0800

    Remove PipelineJobPreparer.checkSourceDataSource() and 
checkTargetDataSource() (#29460)
    
    * Remove PipelineJobPreparer.checkSourceDataSource()
    
    * Remove PipelineJobPreparer.checkSourceDataSource()
    
    * Remove PipelineJobPreparer.checkTargetDataSource()
---
 .../core/checker/DataSourceCheckEngine.java        | 32 +++++++++++++++++--
 .../core/preparer/PipelineJobPreparer.java         | 36 +---------------------
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  5 ++-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  5 +--
 .../migration/preparer/MigrationJobPreparer.java   | 22 ++++++-------
 5 files changed, 46 insertions(+), 54 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 89dbd1d2b75..f9e48d9a224 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -17,12 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.checker;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -30,6 +31,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Data source check engine.
@@ -45,6 +47,30 @@ public final class DataSourceCheckEngine {
         sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
     }
     
+    /**
+     * Check source data source.
+     * 
+     * @param dataSource to be checked source data source
+     */
+    public void checkSourceDataSource(final DataSource dataSource) {
+        Collection<DataSource> dataSources = Collections.singleton(dataSource);
+        checkConnection(dataSources);
+        checkPrivilege(dataSources);
+        checkVariable(dataSources);
+    }
+    
+    /**
+     * Check target data source.
+     *
+     * @param dataSource to be checked target data source
+     * @param importerConfig importer configuration
+     */
+    public void checkTargetDataSource(final DataSource dataSource, final 
ImporterConfiguration importerConfig) {
+        Collection<DataSource> dataSources = Collections.singleton(dataSource);
+        checkConnection(dataSources);
+        checkTargetTable(dataSources, 
importerConfig.getTableAndSchemaNameMapper(), 
importerConfig.getLogicTableNames());
+    }
+    
     /**
      * Check data source connections.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
index 7b6494a7686..792f581f092 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
@@ -22,10 +22,8 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
@@ -38,7 +36,6 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Optional;
 
 /**
@@ -54,7 +51,7 @@ public final class PipelineJobPreparer {
      * Get incremental position.
      *
      * @param initIncremental init incremental
-     * @param dumperContext dumper config
+     * @param dumperContext incremental dumper context
      * @param dataSourceManager data source manager
      * @return ingest position
      * @throws SQLException SQL exception
@@ -71,37 +68,6 @@ public final class PipelineJobPreparer {
         return DatabaseTypedSPILoader.getService(PositionInitializer.class, 
databaseType).init(dataSource, dumperContext.getJobId());
     }
     
-    /**
-     * Check data source.
-     *
-     * @param dataSources data source
-     */
-    public void checkSourceDataSource(final Collection<? extends DataSource> 
dataSources) {
-        if (dataSources.isEmpty()) {
-            return;
-        }
-        DataSourceCheckEngine checkEngine = new 
DataSourceCheckEngine(databaseType);
-        checkEngine.checkConnection(dataSources);
-        checkEngine.checkPrivilege(dataSources);
-        checkEngine.checkVariable(dataSources);
-    }
-    
-    /**
-     * Check target data source.
-     *
-     * @param importerConfig importer config
-     * @param targetDataSources target data sources
-     */
-    public void checkTargetDataSource(final ImporterConfiguration 
importerConfig, final Collection<? extends DataSource> targetDataSources) {
-        if (null == targetDataSources || targetDataSources.isEmpty()) {
-            log.info("target data source is empty, skip check");
-            return;
-        }
-        DataSourceCheckEngine dataSourceCheckEngine = new 
DataSourceCheckEngine(databaseType);
-        dataSourceCheckEngine.checkConnection(targetDataSources);
-        dataSourceCheckEngine.checkTargetTable(targetDataSources, 
importerConfig.getTableAndSchemaNameMapper(), 
importerConfig.getLogicTableNames());
-    }
-    
     /**
      * Cleanup job preparer.
      *
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 04ef66dcac7..a177080d867 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -196,9 +196,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
                 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
     }
     
-    private static TransmissionJobItemProgress 
getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
-                                                                              
final PipelineDataSourceManager dataSourceManager,
-                                                                              
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
+    private TransmissionJobItemProgress getTransmissionJobItemProgress(final 
CDCJobConfiguration jobConfig, final PipelineDataSourceManager 
dataSourceManager,
+                                                                       final 
IncrementalDumperContext incrementalDumperContext) throws SQLException {
         TransmissionJobItemProgress result = new TransmissionJobItemProgress();
         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
         
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index f22418276c4..c9da76d64ba 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -105,8 +105,9 @@ public final class CDCJobPreparer {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
-            taskConfig.getDumperContext().getCommonContext().setPosition(new 
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
-                    .getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+            DatabaseType databaseType = 
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
+            IngestPosition position = new 
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+            
taskConfig.getDumperContext().getCommonContext().setPosition(position);
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index ddbb5a3a416..db5e6407798 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
@@ -34,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChann
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
@@ -74,7 +76,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
@@ -100,7 +101,7 @@ public final class MigrationJobPreparer {
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
-        new 
PipelineJobPreparer(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
+        new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
         if (jobItemContext.isStopping()) {
             PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
@@ -156,21 +157,19 @@ public final class MigrationJobPreparer {
     }
     
     private void prepareAndCheckTarget(final MigrationJobItemContext 
jobItemContext) throws SQLException {
+        DatabaseType targetDatabaseType = 
jobItemContext.getJobConfig().getTargetDatabaseType();
         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
-            prepareTarget(jobItemContext);
+            prepareTarget(jobItemContext, targetDatabaseType);
         }
-        TransmissionJobItemProgress initProgress = 
jobItemContext.getInitProgress();
-        if (null == initProgress) {
+        if (null == jobItemContext.getInitProgress()) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            new 
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSource(
-                    jobItemContext.getTaskConfig().getImporterConfig(), 
Collections.singleton(targetDataSource));
+            new 
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource,
 jobItemContext.getTaskConfig().getImporterConfig());
         }
     }
     
-    private void prepareTarget(final MigrationJobItemContext jobItemContext) 
throws SQLException {
+    private void prepareTarget(final MigrationJobItemContext jobItemContext, 
final DatabaseType targetDatabaseType) throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         Collection<CreateTableConfiguration> createTableConfigs = 
jobItemContext.getTaskConfig().getCreateTableConfigurations();
-        DatabaseType targetDatabaseType = 
jobItemContext.getJobConfig().getTargetDatabaseType();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         PipelineJobDataSourcePreparer preparer = new 
PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 targetDatabaseType));
         preparer.prepareTargetSchemas(new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager));
@@ -184,8 +183,9 @@ public final class MigrationJobPreparer {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
-            taskConfig.getDumperContext().getCommonContext().setPosition(new 
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
-                    .getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+            DatabaseType databaseType = 
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
+            IngestPosition position = new 
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+            
taskConfig.getDumperContext().getCommonContext().setPosition(position);
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }

Reply via email to